tornado 中的异步调用模式

682 阅读4分钟

fundamental concepts

  • 同步 调用者等待调用结果的返回

  • 异步 调用者不等待调用结果的返回,而是通过间隔轮询、通知等方式得知结果

  • 阻塞 调用方式影响后续指令的执行

  • 非阻塞 调用方式不影响后续指令的执行


一般来说,阻塞是绝对的,非阻塞则是相对的,因为任何指令或调用的执行都要占用 CPU 周期,或网络,或 IO。非阻塞只是说调用或者资源消耗不影响后续逻辑执行,他们经得起等待。非阻塞往往和异步一起出现。

tornado 是一个异步非阻塞 httpserver,同时也是一个 web framework。

Tornado is a Python web framework and asynchronous networking library, originally developed at FriendFeed. By using non-blocking network I/O, Tornado can scale to tens of thousands of open connections, making it ideal for long polling, WebSockets, and other applications that require a long-lived connection to each user。

在 tornado 中异步和非阻塞是通过 ioloop 和 Future 实现的,Future 是借助 python 的协程来实现非阻塞调用。tornado 提供了不同的异步调用形式来适配不同的调用场景,本文的目的就是为了说明不同形式是如何使用的以及它们的差别。

为了演示方便,这里使用了一个公共模块 util.py 来提供 ioloop 的启动和销毁,下文不做特殊说明皆是如此。

# util.py 
    import tornado.ioloop
    COUNTER = 0
    def stop_loop(times):
        global COUNTER
        COUNTER += 1
        if COUNTER == times:
            tornado.ioloop.IOLoop.instance().stop()
            print('====> ioloop end')
            def start_loop():
        print('====> ioloop start')
        ioloop = tornado.ioloop.IOLoop.current()
        ioloop.start()
        

最常用的调用方式

在 tornado 中最普遍的使用方式是把函数的调用结果封装成 Future,利用 ioloop 执行并异步获得结果。

# -*- coding:utf-8 -*-
    from tornado.httpclient import AsyncHTTPClient
    from tornado.concurrent import Future
    from tornado import gen
    from util import stop_loop, start_loop
    # fetch 返回的是 future
    def asyn_fetch_future(url):
        http_client = AsyncHTTPClient()
        return http_client.fetch(url)
        def asyn_fetch_future_callback(future):
        result = future.result()
        print('future_callback')
        print(result.request.url, result.code, result.reason, result.request_time)
        stop_loop(1)
        if __name__ == '__main__':
        result_future = asyn_fetch_future('http://www.apple.com/cn/')  #1
        result_future.add_done_callback(asyn_fetch_future_callback)
        start_loop()
        

#1 处通过tornado 提供的异步 httpclient 获得一个 Future,为 Future 添加执行结果回调函数获得执行结果,Future 的执行结果也是一个 Future。

输出结果如下

====> ioloop start
future_callback
('http://www.apple.com/cn/', 200, 'OK', 0.5546278953552246)
====> ioloop end
        0.71 real         0.06 user         0.06 sys
        

这种调用方式必须手动启动 ioloop 并在调用结束之后手动销毁 ioloop,想要获得调用结果必须为 Future 添加回调。

只关心调用,不在乎结果

有时候我们并不在乎函数的调用结果,只要函数正确执行即可,ioloop 提供了 spawn_callback 来执行这样的操作:

# -*- coding:utf-8 -*-
    import tornado.gen
    import tornado.ioloop
    from util import start_loop, stop_loop
    @tornado.gen.coroutine
    def divide(x, y):
        return x / y
        if __name__ == '__main__':
        # The IOLoop will catch the exception and print a stack trace in
        # the logs. Note that this doesn't look like a normal call, since
        # we pass the function object to be called by the IOLoop.
        tornado.ioloop.IOLoop.current().spawn_callback(divide, 1, 0)
        tornado.ioloop.IOLoop.current().add_timeout(
            tornado.ioloop.time.time() + 1, stop_loop, 1)
        start_loop()
        

借助 spawn_callback 可以直接执行函数调用,但是依然需要手动处理 ioloop 的开闭。

一次性执行

run_sync 可以自动开启 ioloop 并在函数执行结束之后关闭 ioloop,此种调用在执行一次性操作时非常有用,比如要对数据库进行一次性修改:

@tornado.gen.coroutine
    def init_tag():
        tb_tag = Tag() #1
        for i in range(10):
            result = yield tb_tag.insert({'name': random.choice('abcdefghjklpoiuytrewq')})
            print(result)
            if __name__ == '__main__':
        tornado.ioloop.IOLoop.current().run_sync(lambda: init_tag())
        

#1 是一个 motor Mongodb collection,借助 run_sync 可以非常方便对数据库执行操作,由于run_sync 只接受一个参数,如果函数调用需要传参,可以把函数封装成 lambda。

使用 callback

如果调用需要与 callback 函数进行交互,可以利用 gen.task

# -*- coding:utf-8 -*-
    import tornado.gen
    import tornado.ioloop
    from util import start_loop, stop_loop
    @tornado.gen.coroutine
    def call_task():
        result = yield tornado.gen.Task(some_function, 1, 2)
        raise tornado.gen.Return(result)
        def fetch_coroutine_callback(future):
        print('coroutine callback ==> ', future.result())
        stop_loop(1)
        def some_function(x, y, callback=None):
        print('some_function called')
        callback(x * y)
        if __name__ == '__main__':
        future = call_task()
        future.add_done_callback(fetch_coroutine_callback)
        start_loop()
        

使用 gen.task 可以直接返回一个 Future ,被调用的函数会自动增加一个 callback 函数,用来在函数执行结束之后把结果进行回调通知。

通过 threadPool 来调用阻塞操作

#-*- coding:utf-8 -*-
    from tornado.concurrent import Future
    from tornado import gen
    import time
    from concurrent.futures import ThreadPoolExecutor
    from util import start_loop, stop_loop
    EXECUTOR = ThreadPoolExecutor(max_workers=4)
    def fetch_coroutine_callback(future):
        print('coroutine callback')
        print(future.result())
        stop_loop(2)
        def sleep_func(t):
        print('sleep_func call')
        time.sleep(t)
        return 'blocking func result'
        @gen.coroutine
    def blocking():
        result = yield EXECUTOR.submit(sleep_func, *(4, ))
        raise gen.Return(result)
        @gen.coroutine
    def not_blocking():
        future = Future()
        future.set_result('not blocking func result')
        result = yield future
        raise gen.Return(result)
        if __name__ == '__main__':
        """
        用多线程的方式借助协程实现异步调用
        """
        blocking().add_done_callback(fetch_coroutine_callback)
        not_blocking().add_done_callback(fetch_coroutine_callback)
        start_loop()
        

如果代码中有阻塞操作,可以借助 ThreadPoolExecutor 来完成异步的调用,这样把耗时的操作交给别的线程,可以使当前的线程继续执行后续的操作。这里的阻塞操作一般是 IO 或者其他系统调用。

并行执行

tornado 的 Future 是支持并行执行的,可以对多个 future 进行 yield 操作并返回多个结果。

# -*- coding:utf-8 -*-
    from tornado.httpclient import AsyncHTTPClient
    from tornado.concurrent import Future
    from tornado import gen
    from util import stop_loop, start_loop
    @gen.coroutine
    def fetch_many_coroutine(urls):
        http_client = AsyncHTTPClient()
        response1, response2 = yield [http_client.fetch(urls[0]), http_client.fetch(urls[1])]
        raise gen.Return([response1, response2])
        @gen.coroutine
    def fetch_coroutine(urls):
        http_client = AsyncHTTPClient()
        responses = yield [http_client.fetch(url) for url in urls]
        raise gen.Return(responses)
        def fetch_coroutine_callback(future):
        print('coroutine callback')
        for result in future.result():
            print(
                result.request.url,
                result.code, result.reason, result.request_time)
        stop_loop(2)
        if __name__ == '__main__':
        """
        使用gen.coroutine 可以很方便让包含 yield 的函数返回future
        """
        result_future = fetch_coroutine(['https://baidu.com', 'https://baidu.com'])
        result_future.add_done_callback(fetch_coroutine_callback)
        result_future = fetch_many_coroutine(['https://baidu.com', 'https://baidu.com'])
        result_future.add_done_callback(fetch_coroutine_callback)
        start_loop()
        

文中所有代码见 tornado-asyn,欢迎指正。