我们知道Node.js里充满着大量的异步, 后来出现了Promise以及async/await来解决"callback hell"的问题。我们就来看看promise以及async/await如何简化JS并发代码的编写, 最后再给出一份实现相同功能的Go代码。
问题
代码开发中经常会做的一件事就是去请求一个api, 并可能进一步根据api返回结果去获取访问新的接口。 这里我们构造一个问题:获取https://cnodejs.org/ 前10个主题的id、title、date、作者昵称以及第一个回复者的昵称。 cnodejs提供了api, https://cnodejs.org/api 这里的前两个接口就能满足我们的要求。 首先用https://cnodejs.org/api/v1/topics 接口获取到前10个topics, 然后取出每个topic的id去访问get /topic/:id 主题详情
接口,
里面可以获取到回复数据。
简单实现
发起网络请求有很多方法, 我们这里采用axios库, 有几个好处, 其中包括同时支持Node.js和Browser。
我们直接用“最先进”的async/await来实现一个版本:
1const axios = require("axios"); 2 3async function getFirst10TopicsIncludeFirstReplyAuthor() { 4 const response = await axios.get( 5 "https://cnodejs.org/api/v1/topics?limit=10" 6 ); 7 const json = response.data; 8 const first10 = json.data.map(topic => { 9 return {10 id: topic.id,11 title: topic.title,12 date: topic.create_at,13 author: topic.author.loginname14 };15 });1617 for (let topic of first10) {18 const response = await axios.get(19 `https://cnodejs.org/api/v1/topic/${topic.id}`20 );21 const json = response.data;22 const firstReply = json.data.replies[0];23 topic.firstReplyAuthor = firstReply && firstReply.author.loginname;24 }2526 return first10;27}2829getFirst10TopicsIncludeFirstReplyAuthor().then(data => console.log(data));
并发
上述代码简单直接, 用了async/await, 异步代码看上去基本上是同步的, 很直观易懂。 先发起一个请求, 获取10个topics的信息, 然后针对每个topic发起一个请求, 去获取第一条回复数据,最后把数据拼凑在一起返回。 由于后面的请求需要第一个请求返回的id, 因此必须等到第一个请求回来才可以发送后面的请求, 这块没有任何问题。 但是后面的10个请求完全是独立的, 因此可以并发请求,这样能大大缩短时间。比如每个请求需要花费1s, 则上述代码总共需要花费 1(第一个请求) + 10(后面10个请求) = 11s
,
而如果将第二步的请求完全并发则只需要1(第一个请求) + 1(后面10个请求同时请求) = 2s
!!!
由于网络请求受网速影响很大不利于我们精确分析问题, 也避免大量的请求给Cnodejs服务造成影响, 我们在本地用setTimout
模拟网络请求花费的时间。
上述代码在并发性上跟下面代码基本等价:
1// 模拟一次api网络请求花费1s 2function mockAPI(result, time = 1000) { 3 return new Promise((resolve, reject) => { 4 setTimeout(() => { 5 resolve(result); 6 }, time); 7 }); 8} 910async function get10Topics() {11 const t1 = Date.now();12 const result = [];13 const total = await mockAPI(10);14 for (let i = 1; i <= total; i += 1) {15 const r = await mockAPI(i);16 result.push(r);17 }18 const t2 = Date.now();19 console.log(`total cost: ${t2 - t1}ms.`);20 return result;21}2223get10Topics().then(data => console.log(data));
执行之后发现, 确实在11s左右:
1➜ test-js git:(master) ✗ node p1.js2total cost: 11037ms.3[ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ]
Promise.all可以同时发起多个Promise,等到所有Promise都完成了之后返回一个数组, 包含每个Promise的结果。
1// 模拟一次api网络请求花费1s 2function mockAPI(result, time = 1000) { 3 return new Promise((resolve, reject) => { 4 setTimeout(() => { 5 resolve(result); 6 }, time); 7 }); 8} 910async function get10Topics2() {11 const t1 = Date.now();12 const total = await mockAPI(10);13 const promises = [];14 for (let i = 1; i <= total; i += 1) {15 promises.push(mockAPI(i));16 }17 const result = await Promise.all(promises)18 const t2 = Date.now();19 console.log(`total cost: ${t2 - t1}ms.`);20 return result;21}2223get10Topics2().then(data => console.log(data));
时间正如我们说的, 缩短成了2s!
1➜ test-js git:(master) ✗ node p2.js2total cost: 2005ms.3[ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ]
限流
上面第二种方法已经大大提高率效率, 而且请求数越多, 提高的效率越多。 前面的分析可以得出, 如果是获取前100个topics, 第一种串行的方法需要101s, 而第二种还是2s!!!
仔细想想你会发现哪里不对, 那就是第二种方法“太并发”了!10个请求可能还好, 如果同时并发100个请求, 那对服务器就会造成一定的影响, 如果是1000个,10000个, 那问题就更大了, 甚至到了一定程度, 会超过操作系统允许打开的连接数, 对客户端本身也会有很大的影响。
所以我们需要限制最大并发数,比如我们限制最大并发数为3, 则10个请求大概是3个3个一组, 总共会有4组(最后一组只有1个), 总共时间是5s, 这也比11s提高了50%多。一种实现方式如下:
1async function get10Topics3() { 2 const t1 = Date.now(); 3 const total = await mockAPI(10); 4 const MAX_CURRENCY = 3; 5 const result = []; 6 for (let i = 1; i <= total; i += MAX_CURRENCY) { 7 const promises = []; 8 for (let j = i; j < i + MAX_CURRENCY && j <= total; j += 1) { 9 promises.push(mockAPI(j));10 }11 const r = await Promise.all(promises);12 result.push(...r);13 }14 const t2 = Date.now();15 console.log(`total cost: ${t2 - t1}ms.`);16 return result;17}1819get10Topics3().then(data => console.log(data));
看一下结果:
1➜ test-js git:(master) ✗ node p3.js2total cost: 5012ms.3[ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ]
还有什么问题么?
One More Step
上面的实现方法, 既利用了并发, 又对并发做了一定限制保证不至于把系统资源耗尽,似乎是完美的。 但是如果每个请求所需要的时间不一样呢?get10Topics3
的实现方式是每三个一组, 等着三个都完成了, 再进行下一组请求。 那么如果三个任务中, 有一个花费的时间比较多, 另外两个任务完成了之后, 本来可以继续开始新的任务的, 现在必须等着第三个任务完成了才能开始新的任务。甚至如果三个任务需要的时间都不一样, 那么第一个需要等第二个和第三个, 第二个需要等第三个,
整个系统就被最慢的那个任务拖累了。 比如第一个任务需要1s, 第二个任务需要2s, 第三个任务需要3s, 则get10Topics3
每组任务需要3s, 三组任务需要3 * 3 = 9s
, 最后一组那个任务只需要1s, 总共需要1 + 3 + 3 + 3 + 1 = 11s
, 当然这也比完全串行需要的时间1 + 1 + 2 + 3 + 1 + 2 + 3 + 1 + 2 + 3 + 1 = 20s
要快不少。
1// 模拟一次api网络请求花费特定时间 2function mockAPI(result, time = 1000) { 3 console.log(result, time); 4 return new Promise((resolve, reject) => { 5 setTimeout(() => { 6 resolve(result); 7 }, time); 8 }); 9}1011async function get10Topics4() {12 const t1 = Date.now();13 const total = await mockAPI(10);14 const MAX_CURRENCY = 3;15 const result = [];16 for (let i = 1; i <= total; i += MAX_CURRENCY) {17 const promises = [];18 for (let j = i; j < i + MAX_CURRENCY && j <= total; j += 1) {19 const costtime = j % 3 === 0 ? 3 : j % 3; // 第一个任务1s, 第二个2是, 第三个3s...20 promises.push(mockAPI(j, costtime * 1000));21 }22 const t3 = Date.now();23 const r = await Promise.all(promises);24 const t4 = Date.now();25 console.log(`promise ${i} cost: ${t4 - t3}ms`);26 result.push(...r);27 }28 const t2 = Date.now();29 console.log(`total cost: ${t2 - t1}ms.`);30 return result;31}3233get10Topics4().then(data => console.log(data));
运行结果:
1➜ test-js git:(master) ✗ node p4.js 210 1000 31 1000 42 2000 53 3000 6promise 1 cost: 3002ms 74 1000 85 2000 96 300010promise 4 cost: 2999ms117 1000128 2000139 300014promise 7 cost: 3002ms1510 100016promise 10 cost: 1005ms17total cost: 11030ms.18[ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ]
大家很容易想到, 把所需时间短的任务放一起并发执行, 这样就能减少互相等待的时间。比如把4个1s的放一起, 3个2s的放一起, 3个3s的放一起, 则总共需要时间为:1 + 1 + 2 + 3 + 1 = 8s
, 又提高了一些。但是, 一方面我们在实际任务开始并完成之前, 并不知道具体哪个任务需要花多长时间, 另一方面不可能刚好总有花同样时间的任务能凑成一组, 甚至极端情况下, 每个任务所花时间都不一样。
仔细想想, 我们只需要这么做: 构建一个任务池, 一开始并发三个任务, 每个任务回来之后不用等其他两个任务, 直接看一下任务池还有任务么, 有的话就直接去做,直到所有任务都完成即可。
由于Node.js里面没有信号量来同步各个“线程”之间的工作, 这里用了递归并操作公共变量的方式实现, 如果读者有更好的方式可以给作者留言。注意, “并发地修改共享变量是万恶之源, 有data race的问题, 好在JS里面是单线程, 所以没有这个问题。
1// 模拟一次api网络请求花费特定时间 2function mockAPI(result, time = 1000) { 3 console.log(result, time); 4 return new Promise((resolve, reject) => { 5 setTimeout(() => { 6 resolve(result); 7 }, time); 8 }); 9}1011const start = Date.now();12function worker(tasks, result) {13 const task = tasks.shift();14 if (!task) {15 // 任务结束16 return;17 }18 const costtime = task % 3 === 0 ? 3 : task % 3; // 第一个任务1s, 第二个2是, 第三个3s...19 return mockAPI(task, costtime * 1000).then(r => {20 console.log(`${r} completes at time: ${Date.now() - start}`);21 result.push(r);22 return worker(tasks, result);23 });24}2526async function get10Topics5() {27 const t1 = Date.now();28 const total = await mockAPI(10);29 const MAX_CURRENCY = 3;30 const result = [];3132 const tasks = [];33 for (let i = 1; i <= total; i += 1) {34 tasks.push(i);35 }3637 const promises = [];38 for (let i = 0; i < MAX_CURRENCY; i += 1) {39 promises.push(worker(tasks, result));40 }4142 const r = await Promise.all(promises);43 const t2 = Date.now();44 console.log(`total cost: ${t2 - t1}ms.`);45 return result;46}4748get10Topics5().then(data => console.log(data));
运行代码可以看到结果:
1➜ test-js git:(master) ✗ node p5.js 210 1000 31 1000 42 2000 53 3000 61 completes at time: 2s, by worker0 74 1000 82 completes at time: 3s, by worker1 95 2000104 completes at time: 3s, by worker0116 3000123 completes at time: 4s, by worker2137 1000145 completes at time: 5s, by worker1158 2000167 completes at time: 5s, by worker2179 3000186 completes at time: 6s, by worker01910 1000208 completes at time: 7s, by worker12110 completes at time: 7s, by worker0229 completes at time: 8s, by worker223total cost: 8032ms.24[ 1, 2, 4, 3, 5, 7, 6, 8, 10, 9 ]
我们可以看到,一开始同时开启了worker0, worker1, worker2
三个“线程”去做事, worker0
在第2s(因为第1s是调用第一个api)完成了task1,它并没有等待, 而是继续开始做task4。然后又过了1s, worker1完成了task2然后去开始做task5, 而此刻worker0完成了task4并开始去做task6, 又过了1s, worker2才完成了task3然后去做task7……可以看到每个worker都在争先恐后地完成任务,
直到所有任务全部完成, 总共花了8s时间。
重新实现并发访问API
这里我将最早串行访问API接口的代码改成并发执行, 没有做限流, 读者可根绝前文分析修改成限流版本,就当留作小练习吧。
1const axios = require("axios"); 2 3function getFirst10TopicsIncludeFirstReplyAuthor() { 4 return axios 5 .get("https://cnodejs.org/api/v1/topics?limit=10") 6 .then(function(response) { 7 const json = response.data; 8 const first10 = json.data.map(topic => { 9 return {10 id: topic.id,11 title: topic.title,12 date: topic.create_at,13 author: topic.author.loginname14 };15 });1617 const promises = first10.map(data => {18 return axios19 .get(`https://cnodejs.org/api/v1/topic/${data.id}`)20 .then(response => {21 const json = response.data;22 const firstReply = json.data.replies[0];23 return {24 id: json.data.id,25 firstReplyAuthor: firstReply && firstReply.author.loginname26 };27 });28 });29 return Promise.all(promises).then(rs => {30 const map = rs.reduce((acc, e) => {31 acc.set(e.id, e);32 return acc;33 }, new Map());34 for (let topic of first10) {35 topic.firstReplyAuthor = map.get(topic.id).firstReplyAuthor;36 }37 return first10;38 });39 })40 .catch(function(error) {41 console.log(error);42 });43}4445getFirst10TopicsIncludeFirstReplyAuthor().then(data => console.log(data));
Go语言实现
其实Go跟Promise没啥关系, 只是最近刚好在用Go语言做东西, 因此拿来对比一下。Go里面很容易实现限流的功能,这里直接贴上代码,不做过多分析。
1package main 2 3import ( 4 "fmt" 5 "time" 6) 7 8const start = time.Now().Unix() 910func mockAPI(result int, duration time.Duration) int {11 fmt.Println(result, duration)12 time.Sleep(duration)13 return result14}1516func worker(id int, jobs <-chan int, result chan<- int) {17 for job := range jobs {18 t := job % 319 if t == 0 {20 t = 321 }22 r := mockAPI(job, (time.Duration)(t)*time.Second)23 diff := time.Now().Unix() - start24 fmt.Printf("%d completes at time: %ds, by worker%d\n", r, diff, id)25 result <- r26 }27}28func main() {29 t1 := time.Now().Unix()3031 jobs := make(chan int, 10)32 result := make(chan int, 10)33 total := mockAPI(10, 1*time.Second)3435 const MaxCurrency = 336 for i := 0; i < MaxCurrency; i++ {37 go worker(i, jobs, result)38 }3940 for i := 1; i <= total; i++ {41 jobs <- i42 }43 close(jobs)4445 rs := make([]int, total)46 for i := 0; i < total; i++ {47 r := <-result48 rs[i] = r49 }5051 t2 := time.Now().Unix()52 fmt.Printf("total cost: %ds.\n", (t2 - t1))53 fmt.Println(rs)54}
执行输出如下:
1➜ chap8 go run currency-rate-limit2.go 210 1s 31 1s 42 2s 53 3s 61 completes at time: 2s, by worker0 74 1s 84 completes at time: 3s, by worker0 92 completes at time: 3s, by worker2105 2s116 3s123 completes at time: 4s, by worker1137 1s147 completes at time: 5s, by worker1158 2s165 completes at time: 5s, by worker0179 3s186 completes at time: 6s, by worker21910 1s208 completes at time: 7s, by worker12110 completes at time: 7s, by worker2229 completes at time: 8s, by worker023total cost: 8s.24[1 4 2 3 7 5 6 8 10 9]
参考资料
-
https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Global_Objects/Promise
-
https://yar999.gitbooks.io/gopl-zh/content/ch8/ch8-06.html