我所理解的 tornado - concurrent 部分

阅读 291
收藏 9
2017-03-13
原文链接:leohowell.com

tornado使用了协程


子程序subroutine的调用过程是很清晰的,比如A调用B,B调用C,C调用D,然后一层层返回,最后A返回,然后结束

协程coroutine中则有一个中断挂起的概念,比如说有任务A和B,A执行过程中发现自己需要被挂起或者线程发现要把A挂起,那么就挂起A去执行B,知道B被挂起然后A继续执行。如此反复。


最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。

第二大优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

协程 - 廖雪峰的官方网站


使用协程必须要application层实现协程的调度,同时需要语言本身的支持。

tornado完成了这些。

我们来看一下tornado官方文档中使用协程的例子。

class GenAsyncHandler(RequestHandler):
    @gen.coroutine
    def get(self):
        http_client = AsyncHTTPClient()
        response = yield http_client.fetch("http://example.com")
        do_something_with_response(response)
        self.render("template.html")

几乎是用同步方式来写代码,避免了callback的存在。

其实我们都知道callback不可避免,如果使用框架的时候没有显式使用callback,那么一定是这个框架做了一些工作(挂起协程重新得到执行需要callback)。

我想了一下怎么写出tornado风格的程序,也就是程序对yield的处理,这个很有意思。来看下面这段代码:

from backports_abc import Generator as GeneratorType


class Return(Exception):
    def __init__(self, value=None):
        self.value = value


def coroutine(func):
    def wrapper(*args, **kwargs):
        def _dispatch(yielded):
            if isinstance(yielded, GeneratorType):
                return _execute_yield(yielded)
            else:
                return _send(yielded)

        def _send(yielded):
            try:
                yielded = origin_gen.send(yielded)
                return _dispatch(yielded)
            except (StopIteration, Return) as e:
                return getattr(e, 'value', None)
            except Exception as error:
                print 'terrible error happened: %r' % error

        def _execute_yield(gen):
            yielded = next(gen)
            return _dispatch(yielded)

        result = func(*args, **kwargs)
        origin_gen = result
        return _execute_yield(result)

    return wrapper


def get_value2():
    return 10086


def get_value1():
    yield get_value2()


@coroutine
def test():
    value1 = yield get_value1()
    print 'got value1: %d' % value1

    value2 = yield get_value2()
    print 'got value2: %d' % value2

    raise Return(value1 == value2)


if __name__ == '__main__':
    result = test()
    print result

"""
>>> got value1: 10086
>>> got value2: 10086
>>> True
"""

主要在处理yield挂起的协程怎么继续执行,并且在包含yield的函数中实现同步的返回。

这里面有很关键的两行

yielded = next(gen)

next()是生成器的一次执行, 执行完可能等到一个结果实体,也可能还是一个生成器

yielded = origin_gen.send(yielded)

send()方法用户将挂起的协程唤醒,继续执行这个挂起的协程


现在协程的关键在哪里呢?

关键就在于什么时候执行next()

我们知道返回一个生成器是基本不会耗费什么资源的, 但是生成器执行一次就说不准了, 生成器中的代码可能是CPU密集,可能包含io,这些都会影响主线程的执行。

所以现在有一个想法,为了获得运行效率,我们要避开在执行next()的时候其中的io等待。

如何避开呢?就是把所有的io操作全部注册到ioloop上,收到ioloop通知说io事件已经完成了,请执行next()吧

这样的好处是在协程执行过程中没有io等待时间,CPU不会因为io等待被抢占。



OK, 我们来看一下tornado是怎么实现的。

显而易见的是coroutine装饰器,我们还知道raise gen.Return(...)的写法。

tornado主要有4个组件

  1. Return() 用户同步返回的特殊异常
  2. Future() 被coroutine装饰的函数/方法的返回值
  3. Runner 调度器
  4. coroutine装饰器


先看一下最简单的Return

class Return(Exception):
    def __init__(self, value=None):
        self.value = value

然后是Future类

class Future(object):
    def __init__(self):
        self.result = None
        self.exc_info = None
        self.done = False
        self.callbacks = []

    def set_result(self, result):
        self.result = result
        self.done = True
        for cb in self.callbacks:
            cb(self)

    def add_done_callback(self, fn):
        if self.done:
            fn(self)
        else:
            self.callbacks.append(fn)

上面没有什么好说的,都是字面意思

然后是coroutine装饰器

def coroutine(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        future = Future()

        try:
            result = func(*args, **kwargs)
        except (StopIteration, Return) as e:
            result = getattr(e, 'value', None)
        except Exception:
            future.exc_info = sys.exc_info()
            return future
        else:
            if isinstance(result, GeneratorType):
                try:
                    yielded = next(result)
                except (StopIteration, Return) as e:
                    future.set_result(getattr(e, 'value', None))
                except Exception:
                    future.exc_info = sys.exc_info()
                else:
                    # result is generator, yielded is Future
                    Runner(result, future, yielded)
                try:
                    return future
                finally:
                    future = None
        future.result = result
        future.done = True
        return future
    return wrapper

coroutine装饰器会直接执行一次next()方法:

  1. 如果直接有返回那么被装饰的函数就直接返回了.
  2. 如果返回Future对象, 那么生成一个Runner实例来处理

那么我们看一下Runner

class Runner(object):
    def __init__(self, gen, result_future, first_yielded):
        self.gen = gen
        self.result_future = result_future
        self.future = _null_future
        self.running = False
        self.finished = False

        self.ioloop = IOLoop.instance()

        if self.handle_yield(first_yielded):
            self.run()

    def handle_yield(self, yielded):
        # yielded is definitely Future
        self.future = yielded
        if not self.future.done:
            self.ioloop.add_future(self.future, lambda f: self.run())
            return False
        return True

    def run(self):
        if self.running or self.finished:
            return

        try:
            self.running = True
            while True:
                future = self.future
                if not future.done:
                    return
                self.future = None
                try:
                    value = future.result
                    yielded = self.gen.send(value)
                except (StopIteration, Return) as e:
                    self.finished = True
                    self.future = _null_future
                    self.result_future.set_result(getattr(e, 'value', None))
                    self.result_future = None
                    return
                if not self.handle_yield(yielded):
                    return
        finally:
            self.running = False

首先这里值得注意的是,任何coroutine装饰的函数都会直接返回一个Future, 也就是说next()这个过程可以随便执行,因为生成一个Future对象也是不耗费资源的。

然后Runner会检查Future的done的状态,如果完成了,那么就切换至协程的断点继续执行,否则注册到ioloop中,Future done之后由ioloop来通知Runner进行调度。


好,我们来总结一下执行过程

  1. 生成Future,协程挂起
  2. Runner检查Future是否完成,完成则恢复协程执行,否则添加至ioloop中
  3. ioloop通知Runner 某个Future完成了,Runner恢复协程执行

同样的,我们看到利用了ioloop避开了io等待,从而实现了高效。



协程跟人来处理事情其实非常类似:

  1. 你准备焖饭,把饭放进电饭煲,焖好了会有铃声提醒
  2. 现在你会去炒菜,不会在电饭煲前傻等,相当与协程的切换
  3. 收到饭焖好了通知,停下炒菜去看一眼饭糊了没有,完成焖饭事件,删除改协程
  4. 回来继续炒菜

因为我们的目的是避开io等待,利用协程就是为了达到这个目的。



值得注意的是:

tornado中的io事件都会注册到callback中,比如用来进行网络请求tornado.httpclient.AsyncHTTPClient这个client,如果一个第三方库完全没有适配tornado的ioloop,如requests, 那么你使用它就会是一个灾难, 相当于在一个非阻塞的环境里强行进行阻塞操作,所以请勿使用任何未经适配tornado的第三方package在你基于tornado的项目中。




最后

zigmo项目

Github: A python web framework & wsgi server demo like tornado

参考

[1]. www.tornadoweb.org/en/stable/g…

[2]. www.tornadoweb.org/en/stable/c…

0 许可 CC BY-SA 3.0

© 2017 Leo Howell

评论