我所理解的 tornado - ioloop 部分

阅读 808
收藏 16
2017-03-13
原文链接:leohowell.com

Tornado is a Python web framework and asynchronous networking library, originally developed at FriendFeed. By using non-blocking network I/O, Tornado can scale to tens of thousands of open connections, making it ideal for long polling, WebSockets, and other applications that require a long-lived connection to each user.

Tornado 是一个Python web框架和异步网络库,起初由 FriendFeed 开发. 通过使用非阻塞网络I/O, Tornado可以支撑上万级的连接,处理 长连接, WebSockets ,和其他需要与每个用户保持长久连接的应用。

关键词: 异步

Python网络框架通常分为两个部分WSGI Server和web framework

WSGI Server主要实现WSGI协议用来和web framework对接,使得socket层对请求的处理与framework层对请求的处理分离开来,这样framework就不用关心底层的实现。

举个例子来说,django可以跑在WSGI server上,tornado实现了一个WSGI server,那么Django就可以跑在tornado上 (真的可以的,不行你来打我 :()

tornado的奇妙之处在哪里呢?

它的WSGI server是一个异步非阻塞实现。

所谓异步就是我发出请求时候不管完成没完成都要返回,所以不在乎结果。
阻塞就是调用结果没返回之前我会挂起当前线程,一直在等。
非阻塞就不会挂起,因为Tornado会想办法让自己避免被挂起。

我们来看一下怎么处理一个HTTP请求

这里强势插入一个TIPS
HTTP是一个文本协议,分为请求报文和响应报文,只要按照格式写就是HTTP啦。
所以我们分析的时候其实是TCP请求,只不过我们按照了HTTP协议来解析的。
接下来体会\n\n\r\n的魅力吧

import socket

EOL = b'\n\n'
response  = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
response += b'Hello, world!'

serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.bind(('0.0.0.0', 8080))
serversocket.listen(1)

try:
    while True:
        connectiontoclient, address = serversocket.accept()
        request = b''
        while EOL not in request:
            request += connectiontoclient.recv(1024)
        print('-'*40 + '\n' + request.decode()[:-2])
        connectiontoclient.send(response)
        connectiontoclient.close()
finally:
    serversocket.close()

response就是HTTP响应报文,不太熟悉的话请打印一下看看。

我们会说这是一个阻塞的server。
请求是顺序处理的(这里是废话,因为我们没有并发),请注意下面这行:

request += connectiontoclient.recv(1024)

recv(1024)会造成阻塞。因为不停的从缓冲区读取数据,如果网络数据还没有到达就会阻塞住等数据到来。
(这里其实还有阻塞的地方,当数据准备好之后还要从kernel拷贝到application buffer,这里出于复杂性考虑将这部分忽略)




怎么改进呢?

tornado给出的答案是利用epoll(kqeue/select暂不讨论)

什么是epoll?

epoll是Linux内核的可扩展I/O事件通知机制。 --维基百科

哦,原来是一个IO通知啊。_(:з」∠)_

索性简单理解为缓冲区满了通知你,缓冲区空了通知你,缓冲区崩了通知你。:(

现在来一个epoll版的HTTP server吧

import socket

EOL = b'\n\n'
response  = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
response += b'Hello, world!'

serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.bind(('0.0.0.0', 8080))
serversocket.listen(1)

epoll = select.epoll()
epoll.register(serversocket.fileno(), select.EPOLLIN)

try:
    connections = {}; requests = {}; responses = {}
    while True:
        events = epoll.poll(1)
        for fileno, event in events:
            if fileno == serversocket.fileno():
                connection, address = serversocket.accept()
                connection.setblocking(0)
                epoll.register(connection.fileno(), select.EPOLLIN)
                connections[connection.fileno()] = connection
                requests[connection.fileno()] = b''
                responses[connection.fileno()] = response
            elif event & select.EPOLLIN:
                requests[fileno] += connections[fileno].recv(1024)
                if EOL in requests[fileno]:
                    epoll.modify(fileno, select.EPOLLOUT)
                    print('-'*40 + '\n' + requests[fileno].decode()[:-2])
            elif event & select.EPOLLOUT:
                byteswritten = connections[fileno].send(responses[fileno])
                responses[fileno] = responses[fileno][byteswritten:]
                if len(responses[fileno]) == 0:
                    epoll.modify(fileno, 0)
                    connections[fileno].shutdown(socket.SHUT_RDWR)
            elif event & select.EPOLLHUP:
                epoll.unregister(fileno)
                connections[fileno].close()
                del connections[fileno]
finally:
    epoll.unregister(serversocket.fileno())
    epoll.close()
    serversocket.close()

跟第一版HTTP server有什么不同呢?
缓冲区满啦,快来读数据,通知你select.EPOLLIN,然后recv(1024)
缓冲区空啦,请求已经完啦,快去发送响应吧 通知你select.EPOLLOUT

没通知的时候干什么呢?
accept()新的请求!

每个请求的connection都对应一个文件描述符fd,epoll通知也是通知到描述符。
神奇的地方在哪里?
就在于recv(1024)永远都能读到,不会发生等待数据的情况。




天啦噜,这就已经异步非阻塞了吗?

跑步进入社会主义!ioloop就是对epoll的一层封装!都是纸老虎!

去看tornado ioloop实现! 赶英超美!

import time
import select
import functools
import collections


class IOLoop(object):
    _EPOLLIN = 0x001
    _EPOLLOUT = 0x004
    _EPOLLERR = 0x008
    _EPOLLHUP = 0x010

    READ = _EPOLLIN
    WRITE = _EPOLLOUT
    ERROR = _EPOLLERR | _EPOLLHUP

    PULL_TIMEOUT = 1

    def __init__(self):
        self.handlers = {}
        self.events = {}
        self.epoll = select.epoll()

        self._future_callbacks = collections.deque()

    @staticmethod
    def instance():
        if not hasattr(IOLoop, '_instance'):
            IOLoop._instance = IOLoop()
        return IOLoop._instance

    def add_handler(self, fd_obj, handler, event):
        fd = fd_obj.fileno()
        self.handlers[fd] = (fd_obj, handler)
        self.epoll.register(fd, event)

    def update_handler(self, fd, event):
        self.epoll.modify(fd, event)

    def remove_handler(self, fd):
        self.handlers.pop(fd, None)
        try:
            self.epoll.unregister(fd)
        except Exception as error:
            print 'epoll unregister failed %r' % error

    def replace_handler(self, fd, handler):
        self.handlers[fd] = (self.handlers[fd][0], handler)

    def start(self):
        try:
            while True:
                for i in range(len(self._future_callbacks)):
                    callback = self._future_callbacks.popleft()
                    callback()

                events = self.epoll.poll(self.PULL_TIMEOUT)
                self.events.update(events)
                while self.events:
                    fd, event = self.events.popitem()
                    try:
                        fd_obj, handler = self.handlers[fd]
                        handler(fd_obj, event)
                    except Exception as error:
                        print 'ioloop callback error: %r' % error
                        time.sleep(0.5)
        finally:
            for fd, _ in self.handlers.items():
                self.remove_handler(fd)
            self.epoll.close()

看完你什么感受?
就这么点?
唉 (sigh~) tornado啊年轻人 naive!

所以说啊,还是要提高自己的知识水平




再来一版HTTP server!

import sys
import socket
import logging
import StringIO
from datetime import datetime

from ioloop import IOLoop


EOL1 = b'\n\n'
EOL2 = b'\n\r\n'

class WSGIServer(object):
    ADDRESS_FAMILY = socket.AF_INET
    SOCKET_TYPE = socket.SOCK_STREAM
    BACKLOG = 5

    HEADER_DATE_FORMAT = '%a, %d %b %Y %H:%M:%S GMT'
    SERVER_NAME = 'zigmo/WSGIServer 0.3'

    def __init__(self, server_address):
        self.ssocket = self.setup_server_socket(server_address)
        host, self.server_port = self.ssocket.getsockname()[:2]
        self.server_name = socket.getfqdn(host)

        self.ioloop = IOLoop.instance()
        self.conn_pool = {}

    @classmethod
    def setup_server_socket(cls, server_address):
        ssocket = socket.socket(cls.ADDRESS_FAMILY, cls.SOCKET_TYPE)
        ssocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
        ssocket.bind(server_address)
        ssocket.listen(cls.BACKLOG)
        ssocket.setblocking(0)
        return ssocket

    def set_app(self, application):
        self.application = application

    def _accept(self, ssocket, event):
        if event & IOLoop.ERROR:
            self._close(ssocket)

        connect, addr = ssocket.accept()
        connect.setblocking(0)
        ioloop = IOLoop.instance()
        ioloop.add_handler(connect, self._receive, IOLoop.READ)

        fd = connect.fileno()
        connection = Connection(fd)
        connection.address = addr
        self.conn_pool[fd] = connection

    def _receive(self, connect, event):
        if event & IOLoop.ERROR:
            self._close(connect)

        fd = connect.fileno()
        connection = self.conn_pool[fd]
        fragment = connect.recv(1024)
        connection.request_buffer.append(fragment)

        last_fragment = ''.join(connection.request_buffer[:2])
        if EOL2 in last_fragment:
            ioloop = IOLoop.instance()
            ioloop.update_handler(fd, IOLoop.WRITE)
            ioloop.replace_handler(fd, self._send)

    def _send(self, connect, event):
        if event & IOLoop.ERROR:
            self._close(connect)

        fd = connect.fileno()
        connection = self.conn_pool[fd]
        if not connection.handled:
            self.handle(connection)

        byteswritten = connect.send(connection.response)
        if byteswritten:
            connection.response = connection.response[byteswritten:]

        if not len(connection.response):
            self._close(connect)

    def _close(self, connect, event=None):
        fd = connect.fileno()
        connect.shutdown(socket.SHUT_RDWR)
        connect.close()

        ioloop = IOLoop.instance()
        ioloop.remove_handler(fd)

        del self.conn_pool[fd]

    def serve_forever(self):
        self.ioloop.add_handler(self.ssocket, self._accept,
                                IOLoop.READ | IOLoop.ERROR)
        try:
            self.ioloop.start()
        finally:
            self.ssocket.close()

    def handle(self, connection):
        def start_response(status, response_headers, exc_info=False):
            utc_now = datetime.utcnow().strftime(self.HEADER_DATE_FORMAT)
            connection.headers = response_headers + [
                ('Date', utc_now),
                ('Server', self.SERVER_NAME),
            ]
            connection.status = status

        request_text = ''.join(connection.request_buffer)
        environ = self.get_environ(request_text)
        body = self.application(environ, start_response)
        connection.response = self.package_response(body, connection)

        request_line = request_text.splitlines()[0]
        access_logger.info(
            '%s "%s" %s %s', connection.address[0], request_line,
            connection.status.split(' ', 1)[0], len(body[0]),
        )
        access_logger.debug('\n' + ''.join(
            '< {line}\n'.format(line=line)
            for line in request_text.splitlines()
        ))

    @classmethod
    def parse_request_buffer(cls, text):
        content_lines = text.splitlines()

        request_line = content_lines[0].rstrip('\r\n')
        request_method, path, request_version = request_line.split()
        if '?' in path:
            path, query_string = path.split('?', 1)
        else:
            path, query_string = path, ''

        return {
            'PATH_INFO': path,
            'REQUEST_METHOD': request_method,
            'SERVER_PROTOCOL': request_version,
            'QUERY_STRING': query_string,
        }

    def get_environ(self, request_text):
        request_data = self.parse_request_buffer(request_text)
        scheme = request_data['SERVER_PROTOCOL'].split('/')[1].lower(),
        environ = {
            'wsgi.version': (1, 0),
            'wsgi.url_scheme': scheme,
            'wsgi.input': StringIO.StringIO(request_text),
            'wsgi.errors': sys.stderr,
            'wsgi.multithread': False,
            'wsgi.multiprocess': False,
            'wsgi.run_once': False,
            'SERVER_NAME': self.server_name,
            'SERVER_PORT': self.server_port,
        }
        environ.update(request_data)
        return environ

    def package_response(self, body, connection):
        response = 'HTTP/1.1 {status}\r\n'.format(status=connection.status)
        for header in connection.headers:
            response += '{0}: {1}\r\n'.format(*header)
        response += '\r\n'
        for data in body:
            response += data
        access_logger.debug('\n' + ''.join(
            '> {line}\n'.format(line=line)
            for line in response.splitlines()
        ))
        return response

这...就有点长了
TL;DR
太长 不看!
吃饭去!

上面的WSGIServer类加上ioloop就是tornado WSGI server的简化版实现啦
跟我们第二版HTTP server差不多就是整理了一下代码,规范了接口。

爱看不看!

excited!



最后

zigmo项目

Github: A python web framework & wsgi server demo like tornado

参考

[1]. scotdoyle.com/python-epol…

[2]. segmentfault.com/a/119000000…

0 许可 CC BY-SA 3.0

© 2017 Leo Howell

评论