用Promise实现并发 vs Go goroutine

2,408 阅读8分钟
原文链接: mp.weixin.qq.com

我们知道Node.js里充满着大量的异步, 后来出现了Promise以及async/await来解决"callback hell"的问题。我们就来看看promise以及async/await如何简化JS并发代码的编写, 最后再给出一份实现相同功能的Go代码。

问题

代码开发中经常会做的一件事就是去请求一个api, 并可能进一步根据api返回结果去获取访问新的接口。 这里我们构造一个问题:获取https://cnodejs.org/ 前10个主题的id、title、date、作者昵称以及第一个回复者的昵称。 cnodejs提供了api, https://cnodejs.org/api 这里的前两个接口就能满足我们的要求。 首先用https://cnodejs.org/api/v1/topics 接口获取到前10个topics, 然后取出每个topic的id去访问get /topic/:id 主题详情接口, 里面可以获取到回复数据。

简单实现

发起网络请求有很多方法, 我们这里采用axios库, 有几个好处, 其中包括同时支持Node.js和Browser。

我们直接用“最先进”的async/await来实现一个版本:

 1const axios = require("axios"); 2 3async function getFirst10TopicsIncludeFirstReplyAuthor() { 4  const response = await axios.get( 5    "https://cnodejs.org/api/v1/topics?limit=10" 6  ); 7  const json = response.data; 8  const first10 = json.data.map(topic => { 9    return {10      id: topic.id,11      title: topic.title,12      date: topic.create_at,13      author: topic.author.loginname14    };15  });1617  for (let topic of first10) {18    const response = await axios.get(19      `https://cnodejs.org/api/v1/topic/${topic.id}`20    );21    const json = response.data;22    const firstReply = json.data.replies[0];23    topic.firstReplyAuthor = firstReply && firstReply.author.loginname;24  }2526  return first10;27}2829getFirst10TopicsIncludeFirstReplyAuthor().then(data => console.log(data));

并发

上述代码简单直接, 用了async/await, 异步代码看上去基本上是同步的, 很直观易懂。 先发起一个请求, 获取10个topics的信息, 然后针对每个topic发起一个请求, 去获取第一条回复数据,最后把数据拼凑在一起返回。 由于后面的请求需要第一个请求返回的id, 因此必须等到第一个请求回来才可以发送后面的请求, 这块没有任何问题。 但是后面的10个请求完全是独立的, 因此可以并发请求,这样能大大缩短时间。比如每个请求需要花费1s, 则上述代码总共需要花费 1(第一个请求) + 10(后面10个请求) = 11s, 而如果将第二步的请求完全并发则只需要1(第一个请求) + 1(后面10个请求同时请求) = 2s!!!

由于网络请求受网速影响很大不利于我们精确分析问题, 也避免大量的请求给Cnodejs服务造成影响, 我们在本地用setTimout模拟网络请求花费的时间。

上述代码在并发性上跟下面代码基本等价:

 1// 模拟一次api网络请求花费1s 2function mockAPI(result, time = 1000) { 3  return new Promise((resolve, reject) => { 4    setTimeout(() => { 5      resolve(result); 6    }, time); 7  }); 8} 910async function get10Topics() {11  const t1 = Date.now();12  const result = [];13  const total = await mockAPI(10);14  for (let i = 1; i <= total; i += 1) {15    const r = await mockAPI(i);16    result.push(r);17  }18  const t2 = Date.now();19  console.log(`total cost: ${t2 - t1}ms.`);20  return result;21}2223get10Topics().then(data => console.log(data));

执行之后发现, 确实在11s左右:

1➜  test-js git:(master) ✗ node p1.js2total cost: 11037ms.3[ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ]

Promise.all可以同时发起多个Promise,等到所有Promise都完成了之后返回一个数组, 包含每个Promise的结果。

 1// 模拟一次api网络请求花费1s 2function mockAPI(result, time = 1000) { 3  return new Promise((resolve, reject) => { 4    setTimeout(() => { 5      resolve(result); 6    }, time); 7  }); 8} 910async function get10Topics2() {11  const t1 = Date.now();12  const total = await mockAPI(10);13  const promises = [];14  for (let i = 1; i <= total; i += 1) {15    promises.push(mockAPI(i));16  }17  const result = await Promise.all(promises)18  const t2 = Date.now();19  console.log(`total cost: ${t2 - t1}ms.`);20  return result;21}2223get10Topics2().then(data => console.log(data));

时间正如我们说的, 缩短成了2s!

1➜  test-js git:(master) ✗ node p2.js2total cost: 2005ms.3[ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ]

限流

上面第二种方法已经大大提高率效率, 而且请求数越多, 提高的效率越多。 前面的分析可以得出, 如果是获取前100个topics, 第一种串行的方法需要101s, 而第二种还是2s!!!

仔细想想你会发现哪里不对, 那就是第二种方法“太并发”了!10个请求可能还好, 如果同时并发100个请求, 那对服务器就会造成一定的影响, 如果是1000个,10000个, 那问题就更大了, 甚至到了一定程度, 会超过操作系统允许打开的连接数, 对客户端本身也会有很大的影响。

所以我们需要限制最大并发数,比如我们限制最大并发数为3, 则10个请求大概是3个3个一组, 总共会有4组(最后一组只有1个), 总共时间是5s, 这也比11s提高了50%多。一种实现方式如下:

 1async function get10Topics3() { 2  const t1 = Date.now(); 3  const total = await mockAPI(10); 4  const MAX_CURRENCY = 3; 5  const result = []; 6  for (let i = 1; i <= total; i += MAX_CURRENCY) { 7    const promises = []; 8    for (let j = i; j < i + MAX_CURRENCY && j <= total; j += 1) { 9      promises.push(mockAPI(j));10    }11    const r = await Promise.all(promises);12    result.push(...r);13  }14  const t2 = Date.now();15  console.log(`total cost: ${t2 - t1}ms.`);16  return result;17}1819get10Topics3().then(data => console.log(data));

看一下结果:

1➜  test-js git:(master) ✗ node p3.js2total cost: 5012ms.3[ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ]

还有什么问题么?

One More Step

上面的实现方法, 既利用了并发, 又对并发做了一定限制保证不至于把系统资源耗尽,似乎是完美的。 但是如果每个请求所需要的时间不一样呢?get10Topics3的实现方式是每三个一组, 等着三个都完成了, 再进行下一组请求。 那么如果三个任务中, 有一个花费的时间比较多, 另外两个任务完成了之后, 本来可以继续开始新的任务的, 现在必须等着第三个任务完成了才能开始新的任务。甚至如果三个任务需要的时间都不一样, 那么第一个需要等第二个和第三个, 第二个需要等第三个, 整个系统就被最慢的那个任务拖累了。 比如第一个任务需要1s, 第二个任务需要2s, 第三个任务需要3s, 则get10Topics3每组任务需要3s, 三组任务需要3 * 3 = 9s, 最后一组那个任务只需要1s, 总共需要1 + 3 + 3 + 3 + 1 = 11s, 当然这也比完全串行需要的时间1 + 1 + 2 + 3 + 1 + 2 + 3 + 1 + 2 + 3 + 1 = 20s要快不少。

 1// 模拟一次api网络请求花费特定时间 2function mockAPI(result, time = 1000) { 3  console.log(result, time); 4  return new Promise((resolve, reject) => { 5    setTimeout(() => { 6      resolve(result); 7    }, time); 8  }); 9}1011async function get10Topics4() {12  const t1 = Date.now();13  const total = await mockAPI(10);14  const MAX_CURRENCY = 3;15  const result = [];16  for (let i = 1; i <= total; i += MAX_CURRENCY) {17    const promises = [];18    for (let j = i; j < i + MAX_CURRENCY && j <= total; j += 1) {19      const costtime = j % 3 === 0 ? 3 : j % 3; // 第一个任务1s, 第二个2是, 第三个3s...20      promises.push(mockAPI(j, costtime * 1000));21    }22    const t3 = Date.now();23    const r = await Promise.all(promises);24    const t4 = Date.now();25    console.log(`promise ${i} cost: ${t4 - t3}ms`);26    result.push(...r);27  }28  const t2 = Date.now();29  console.log(`total cost: ${t2 - t1}ms.`);30  return result;31}3233get10Topics4().then(data => console.log(data));

运行结果:

 1➜  test-js git:(master) ✗ node p4.js 210 1000 31 1000 42 2000 53 3000 6promise 1 cost: 3002ms 74 1000 85 2000 96 300010promise 4 cost: 2999ms117 1000128 2000139 300014promise 7 cost: 3002ms1510 100016promise 10 cost: 1005ms17total cost: 11030ms.18[ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ]

大家很容易想到, 把所需时间短的任务放一起并发执行, 这样就能减少互相等待的时间。比如把4个1s的放一起, 3个2s的放一起, 3个3s的放一起, 则总共需要时间为:1 + 1 + 2 + 3 + 1 = 8s, 又提高了一些。但是, 一方面我们在实际任务开始并完成之前, 并不知道具体哪个任务需要花多长时间, 另一方面不可能刚好总有花同样时间的任务能凑成一组, 甚至极端情况下, 每个任务所花时间都不一样。

仔细想想, 我们只需要这么做: 构建一个任务池, 一开始并发三个任务, 每个任务回来之后不用等其他两个任务, 直接看一下任务池还有任务么, 有的话就直接去做,直到所有任务都完成即可。

由于Node.js里面没有信号量来同步各个“线程”之间的工作, 这里用了递归并操作公共变量的方式实现, 如果读者有更好的方式可以给作者留言。注意, “并发地修改共享变量是万恶之源, 有data race的问题, 好在JS里面是单线程, 所以没有这个问题。

 1// 模拟一次api网络请求花费特定时间 2function mockAPI(result, time = 1000) { 3  console.log(result, time); 4  return new Promise((resolve, reject) => { 5    setTimeout(() => { 6      resolve(result); 7    }, time); 8  }); 9}1011const start = Date.now();12function worker(tasks, result) {13  const task = tasks.shift();14  if (!task) {15    // 任务结束16    return;17  }18  const costtime = task % 3 === 0 ? 3 : task % 3; // 第一个任务1s, 第二个2是, 第三个3s...19  return mockAPI(task, costtime * 1000).then(r => {20    console.log(`${r} completes at time: ${Date.now() - start}`);21    result.push(r);22    return worker(tasks, result);23  });24}2526async function get10Topics5() {27  const t1 = Date.now();28  const total = await mockAPI(10);29  const MAX_CURRENCY = 3;30  const result = [];3132  const tasks = [];33  for (let i = 1; i <= total; i += 1) {34    tasks.push(i);35  }3637  const promises = [];38  for (let i = 0; i < MAX_CURRENCY; i += 1) {39    promises.push(worker(tasks, result));40  }4142  const r = await Promise.all(promises);43  const t2 = Date.now();44  console.log(`total cost: ${t2 - t1}ms.`);45  return result;46}4748get10Topics5().then(data => console.log(data));

运行代码可以看到结果:

 1➜  test-js git:(master) ✗ node p5.js 210 1000 31 1000 42 2000 53 3000 61 completes at time: 2s, by worker0 74 1000 82 completes at time: 3s, by worker1 95 2000104 completes at time: 3s, by worker0116 3000123 completes at time: 4s, by worker2137 1000145 completes at time: 5s, by worker1158 2000167 completes at time: 5s, by worker2179 3000186 completes at time: 6s, by worker01910 1000208 completes at time: 7s, by worker12110 completes at time: 7s, by worker0229 completes at time: 8s, by worker223total cost: 8032ms.24[ 1, 2, 4, 3, 5, 7, 6, 8, 10, 9 ]

我们可以看到,一开始同时开启了worker0, worker1, worker2三个“线程”去做事,  worker0在第2s(因为第1s是调用第一个api)完成了task1,它并没有等待, 而是继续开始做task4。然后又过了1s, worker1完成了task2然后去开始做task5, 而此刻worker0完成了task4并开始去做task6, 又过了1s, worker2才完成了task3然后去做task7……可以看到每个worker都在争先恐后地完成任务, 直到所有任务全部完成, 总共花了8s时间。

重新实现并发访问API

这里我将最早串行访问API接口的代码改成并发执行, 没有做限流, 读者可根绝前文分析修改成限流版本,就当留作小练习吧。

 1const axios = require("axios"); 2 3function getFirst10TopicsIncludeFirstReplyAuthor() { 4  return axios 5    .get("https://cnodejs.org/api/v1/topics?limit=10") 6    .then(function(response) { 7      const json = response.data; 8      const first10 = json.data.map(topic => { 9        return {10          id: topic.id,11          title: topic.title,12          date: topic.create_at,13          author: topic.author.loginname14        };15      });1617      const promises = first10.map(data => {18        return axios19          .get(`https://cnodejs.org/api/v1/topic/${data.id}`)20          .then(response => {21            const json = response.data;22            const firstReply = json.data.replies[0];23            return {24              id: json.data.id,25              firstReplyAuthor: firstReply && firstReply.author.loginname26            };27          });28      });29      return Promise.all(promises).then(rs => {30        const map = rs.reduce((acc, e) => {31          acc.set(e.id, e);32          return acc;33        }, new Map());34        for (let topic of first10) {35          topic.firstReplyAuthor = map.get(topic.id).firstReplyAuthor;36        }37        return first10;38      });39    })40    .catch(function(error) {41      console.log(error);42    });43}4445getFirst10TopicsIncludeFirstReplyAuthor().then(data => console.log(data));

Go语言实现

其实Go跟Promise没啥关系, 只是最近刚好在用Go语言做东西, 因此拿来对比一下。Go里面很容易实现限流的功能,这里直接贴上代码,不做过多分析。

 1package main 2 3import ( 4    "fmt" 5    "time" 6) 7 8const start = time.Now().Unix() 910func mockAPI(result int, duration time.Duration) int {11    fmt.Println(result, duration)12    time.Sleep(duration)13    return result14}1516func worker(id int, jobs <-chan int, result chan<- int) {17    for job := range jobs {18        t := job % 319        if t == 0 {20            t = 321        }22        r := mockAPI(job, (time.Duration)(t)*time.Second)23        diff := time.Now().Unix() - start24        fmt.Printf("%d completes at time: %ds, by worker%d\n", r, diff, id)25        result <- r26    }27}28func main() {29    t1 := time.Now().Unix()3031    jobs := make(chan int, 10)32    result := make(chan int, 10)33    total := mockAPI(10, 1*time.Second)3435    const MaxCurrency = 336    for i := 0; i < MaxCurrency; i++ {37        go worker(i, jobs, result)38    }3940    for i := 1; i <= total; i++ {41        jobs <- i42    }43    close(jobs)4445    rs := make([]int, total)46    for i := 0; i < total; i++ {47        r := <-result48        rs[i] = r49    }5051    t2 := time.Now().Unix()52    fmt.Printf("total cost: %ds.\n", (t2 - t1))53    fmt.Println(rs)54}

执行输出如下:

 1➜  chap8 go run currency-rate-limit2.go 210 1s 31 1s 42 2s 53 3s 61 completes at time: 2s, by worker0 74 1s 84 completes at time: 3s, by worker0 92 completes at time: 3s, by worker2105 2s116 3s123 completes at time: 4s, by worker1137 1s147 completes at time: 5s, by worker1158 2s165 completes at time: 5s, by worker0179 3s186 completes at time: 6s, by worker21910 1s208 completes at time: 7s, by worker12110 completes at time: 7s, by worker2229 completes at time: 8s, by worker023total cost: 8s.24[1 4 2 3 7 5 6 8 10 9]

参考资料

  • https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Global_Objects/Promise

  • https://yar999.gitbooks.io/gopl-zh/content/ch8/ch8-06.html