异步编程 101:asyncio 进阶上篇

1,566 阅读3分钟

写在前面:

asyncio 初学者可能较难理解,可以结合我前面的几篇文章一起食用:

0x01 本文简介

原视频为 PyCon2019上的一场技术分享,作者是 Spotify的工程师, 通过一个案例, 介绍了 asyncio 的一些 best practice。

0x02 初始化 setup

concurrently publish messages

并发地 publish message:注意图中高亮的那一段,这里用的不是await queue.put(msg),这是因为await会阻塞while循环(参考前面的一篇文章:异步编程 101:asyncio中的 for 循环),也就是说,要等queue.put(msg)完成了,下一趟才会开始。而asyncio.create_task()会马上 "fire" 并且立即返回,你可以理解为fire and forget machinism

如果你没时间看异步编程 101:asyncio中的 for 循环,我简要回顾一下:await做的事情是把当前的协程挂起,把控制权交给事件循环,以便于事件循环有其他协程可以调度时,接着运行其他协程。但是对于执行await的这个协程而言,它是被阻塞的。这个例子中,publish()中的while循环是一个整体。

concurrently consume messages

这里使用msg = await queue.get()是make sense 的,因为你得先得到 message 然后才能接着做其他事情。而后面的restart_host则用create_task,因为我们不想对他await(等待它完成)而阻塞了整个 while Ture循环。

concurrent work

收到 message 之后,除了restart_host()之外,我们可能还需要做一些其他的任务,比如持久化保存message

这只需要在consume方法里面再添加一个create_task()

block when needed

然而有时候我们是希望异步任务能够serial执行的。如果要把restart_host()的逻辑改一下:先获取上次重启时间,然后判断上次重启时间是不是大于7天,如果是,再 restart_host()。这里的last_restart_date()restart_host()是有明确先后顺序的。

但是我们又不想这里的线性执行影响后面的 message 获取,很简单,只要把这个逻辑封装成一个协程,然后create_task()就行。

0x03 cleanup

需要对message ack,这样producer才不会重新发送。所以现在处理消息的逻辑如下:

需要保证:saverestart_host全部完成之前,才能cleanup

使用await是能够 work 的,但是性能肯定不够。

所以asyncio.gather()就派上用场了:这里把save()restart_host()两个协程交给gather,并传给它一个callback,等两个任务全部完成之后调用callback函数,也就是cleanup()

如果不想用 callback,也可以直接await gather,这样的 code 更加 clean:

最后把程序跑一下,图中不同颜色表示的是同一个 message:

  • 获取 message 是没有阻塞的
  • saverestart_host全部结束之后,才ack message。

0x04 graceful shutdowns

publishconsume组合起来,得到最后的main()

0x05 总结一下

  1. Asynchronous != concurrent

不是说你在原来代码的基础上加上asyncawait就能获得并发性能的,很可能你的异步代码还是 sequantial 的。

  1. Serial != blocking

一些有先后顺序的操作,不意味着就一定是要 block 的。比如先做A 在做 B,在等待 A 完成的过程中,我可以抽出时间做 C。放到本文的例子来说,我需要先saverestart_host才能cleanup,但我可以在等待saverestart_host的时候继续做其他事情:把控制权交给主事件循环,接着运行其他协程。

  1. 回顾一下知识点
  • await的作用是什么?
  • asyncio.create_task()
  • asyncio.gather()

如果你像我一样真正热爱计算机科学,喜欢研究底层逻辑,欢迎关注我的微信公众号: