阅读 663

一起写个 WSGI Web Framework

作者简介

旺旺,switch狂热爱好者(掌游瘾少年),但是写代码的功力还是可以的,负责骑手相关的开发工作,常年充当老张、老赵、老方...等人的backup,同时常年把老张、老赵、老方...等人列为自己的backup

写在前面

本文中所列举的代码仅在 Python 2.7.15 和 Python 3.7.0 版本下进行编写测试。

什么是 WSGI

使用 Python 进行 Web 项目开发时,一定少不了听到 WSGI 这个词。WSGI 指的是某种 Web 服务么?或者是某个框架?还是应用程序的名字?

WSGI(Web Server Gateway Interface) 其实是一套调用约定(calling convention),它规定了 HTTP Server 与 HTTP Application 之间的数据交换方式。

引用 PEP333 里的背景介绍:

Python currently boasts a wide variety of web application frameworks, such as Zope, Quixote, Webware, SkunkWeb, PSO, and Twisted Web -- to name just a few. This wide variety of choices can be a problem for new Python users, because generally speaking, their choice of web framework will limit their choice of usable web servers, and vice versa.

By contrast, although Java has just as many web application frameworks available, Java's "servlet" API makes it possible for applications written with any Java web application framework to run in any web server that supports the servlet API.

规定这样一套约定的大致原因就是,Python 的 Web 框架越来越丰富多样,给 Python 开发者带来了多种选择的同时也带来了困扰——如果想从一个框架迁移到另一个上,需要对你的上层业务应用做不小的改动和适配。

因此在 Server 和 Application之间加入 WSGI 增加了可移植性。当然,你可以在 Server 与 Application 中间堆叠进多组中间件,前提是中间件需要实现 Server 和 Application 两侧的对应接口。

wsgi

Python 字符串编码

字符串编码可以说是 Python 初学者的「劝退怪」,UnicodeDecodeErrorUnicodeEncodeError 一路带大家「从入门到放弃」。虽然这在 Python 3 里有一定的缓解,但是当需要进行读写文件和我们马上就要处理的网络数据时,你依旧逃避不了。

Python 2 的原生字符 str 采用 ASCII 编码,支持的字符极其有限,同时字节类型 bytesstr 等同,而 Unicode 字符则使用内建 unicode

# Python 2.7
>>> str is bytes
True
>>> type('字符串')
<type 'str'>
>>> type(u'字符串')
<type 'unicode'>
复制代码

而 Python 3 之所以在字符编码方面对初学者友好,是因为 Python 3 原生字符 str 涵盖了 Unicode,不过又将 bytes 剥离了出来。

# Python 3
>>> str is bytes
False
>>> type('字符')
<class 'str'>
>>> type(b'byte')
<class 'bytes'>
复制代码

处理 HTTP 请求和处理文件一样都只接受字节类型,因此在编写 HTTP 应用时需要格外注意字符串编码问题,尤其是当你需要同时兼容 Python 2 和 Python 3 。

WSGI Application

首先,我们先来编写 WSGI 应用。

根据调用约定,应用侧需要适应一个可调用对象 (callable object) ,在 Python 中,可调用对象可以是一个函数 (function) ,方法 (method) ,一个类 (class) 或者是一个实现了 __call__() 方法的实例。同时,这个可调用对象还必须:

  1. 可接收两个位置参数:
    • 一个包含 CGI 键值的字典;
    • 一个用来构造 HTTP 状态和头信息的回调函数。
  2. 返回的 response body 必须是一个可迭代对象 (iterable) 。

这一章节着重讨论 WSGI 应用,因此我们直接引入 Python 内建的 simple_server 来装载我们的应用。

A Naive App

我们先完成一个可用的 WSGI 应用。

def application(
    # 包含 CGI 环境变量的字典,贯穿一整个请求过程,是请求的上下文
    environ, 
    # 调用方传入的回调方法,我们暂时不需要知道它具体做了什么,只需要
    # 在函数返回前调用它并传入 HTTP 状态和 HTTP 头信息即可
    start_response
):
    # 我们啥也不干,就把请求时的 method 返回给客户端
    body = 'Request Method: {}'.format(environ['REQUEST_METHOD'])
    # 注意,body原来是原生字符串,因此在往下传递数据前需要转化为字节类型
    body = body.encode('utf-8')
    
    # HTTP 返回状态,注意中间的空格
    status = '200 OK'
    
    # 返回的 HTTP 头信息,结构为
    # [(Header Name, Header Value)]
    headers = [
        ('Content-Type', 'text/plain'),
        ('Content-Length', str(len(body))),
    ]
    
    # 调用 start_response 回调
    start_response(status, headers)
    
    # 返回 response body
    # 需要特别注意的是,返回值必须是一个可迭代对象 (iterable) ,
    # 同时,如果这里返回的是字符串,那么外部将会对字符串内每一个字符做单独处理,
    # 所以用列表包一下
    return [body]
复制代码

Put them together

最后,我们将写好的 callable 对象传入内建的 make_server 方法并绑定在本地 8010 端口上:

#! /usr/bin/env python
# coding: utf-8

from wsgiref.simple_server import make_server


def application(environ, start_response):
    body = 'Request Method: {}'.format(environ['REQUEST_METHOD'])
    body = body.encode('utf-8')
    status = '200 OK'
    headers = [
        ('Content-Type', 'text/plain'),
        ('Content-Length', str(len(body))),
    ]
    start_response(status, headers)
    return [body]


def main():
    httpd = make_server('localhost', 8010, application)
    httpd.serve_forever()


if __name__ == '__main__':
    main()
复制代码

通过 curl 我们就能看到对应的返回了。

$ curl 127.0.0.1:8010 -i
# HTTP/1.0 200 OK
# Date: Sun, 06 Jan 2019 13:37:24 GMT
# Server: WSGIServer/0.1 Python/2.7.10
# Content-Type: text/plain
# Content-Length: 19
# 
# Request Method: GET

$ curl 127.0.0.1:8010 -i -XPOST
# HTTP/1.0 200 OK
# Date: Sun, 06 Jan 2019 13:38:15 GMT
# Server: WSGIServer/0.1 Python/2.7.10
# Content-Type: text/plain
# Content-Length: 20
#
# Request Method: POST
复制代码

A Step Further

写好了一个可用的应用,可也太难用了!

那么,我们就更近一步,对请求过程做一些封装和扩展。

class Request(object):

    MAX_BUFF_SIZE = 1024 ** 2

    def __init__(self, environ=None):
        self.environ = {} if environ is None else environ
        self._body = ''

    # 与请求中的 environ 做绑定
    def load(self, environ):
        self.environ = environ

    # QUERY_STRING 是 URL 里「?」后面的字符串
    # 这里我们解析这串字符,并且以键值的形式返回
    @property
    def args(self):
        return dict(parse_qsl(self.environ.get('QUERY_STRING', '')))

    @property
    def url(self):
        return self.environ['PATH_INFO']

    @property
    def method(self):
        return self.environ['REQUEST_METHOD']

    # 提供原生字符,方便再应用层内使用
    @property
    def body(self):
        return tonat(self._get_body_string())
    
    # 读取请求的 body
    # 数据可以通过 self.environ['wsgi.input'] 句柄读取
    # 调用读取方法使得文件指针后移,为了防止请求多次读取,
    # 直接将文件句柄替换成读到的数据
    def _get_body_string(self):
        try:
            read_func = self.environ['wsgi.input'].read
        except KeyError:
            return self.environ['wsgi.input']
        content_length = int(self.environ.get('CONTENT_LENGTH') or 0)
        if content_length > 0:
            self._body = read_func(max(0, content_length))
            self.environ['wsgi.input'] = self._body
        return self._body
    

# 因为 Python 是单线程,同时多线程在IO上不太友好
# 所以应用生命周期内只需要一个 request 请求对象就好
request = Request()
复制代码

接下来是封装返回对象。Response 对象需要确保 body 内的数据为字节类型。

class Response(object):

    default_status = '200 OK'

    def __init__(self, body='', status=None, **headers):
        # 将 body 转为字节类型
        self._body = tobytes(body)
        self._status = status or self.default_status
        self._headers = {
            'Content-Type': 'text/plain',
            
            # Content-Length 的计算需要在 body 转为字节类型后,
            # 否则由于编码的不同,字符串所需要的长度也不一致
            'Content-Length': str(len(self.body)),
        }
        if headers:
            for name, value in headers.items():
                # Python 惯用 snakecase 命名变量,
                # 所以我们需要对字符串做一个简单的转换
                self._headers[header_key(name)] = str(value)

    @property
    def body(self):
        return self._body

    @property
    def headerlist(self):
        return sorted(self._headers.items())

    @property
    def status_code(self):
        return int(self.status.split(' ')[0])

    @property
    def status(self):
        return self._status
复制代码

接下来就是对应用的封装。不过别忘了,它需要是一个 callable 对象。

class Application(object):

    def __init__(self, name):
        self.name = name

    def wsgi(self, environ, start_response):
        # 将请求的环境变量载入 request 对象
        request.load(environ)
        
        body = 'Request Method: {}'.format(request.method)
        response = Response(body)
        start_response(response.status, response.headerlist)
        return [tobytes(response.body)]

    def __call__(self, environ, start_response):
        return self.wsgi(environ, start_response)
    

app = Application(__name__)
httpd = make_server('localhost', 8010, app)
httpd.serve_forever()
复制代码

目前为止,我们已经将上一小节的代码做了封装和扩展,获取 HTTP 请求的数据更方便了。

接下来我们来完成 URL 路由功能。

class Application(object):
    
    def __init__(self, name):
        self.name = name
        self.routers = {}
    
    # 路由装饰器
    # 我们将注册的路由保存在 routers 字典里
    def route(self, url, methods=['GET'], view_func=None):
        def decorator(view_func):
            self.routers[url] = (methods, view_func)
        return decorator

    def _handle(self, request):
        methods, view_func = self.routers.get(request.url, (None, None))
        # 不仅要 URL 一致,method 也要和注册的一致才能调用对应的方法
        if methods is None or request.method not in methods:
            return Response(status='404 Not Found')
        return view_func()

    def wsgi(self, environ, start_response):
        request.load(environ)
        response = self._handle(request)
        start_response(response.status, response.headerlist)
        return [tobytes(response.body)]

    def __call__(self, environ, start_response):
        return self.wsgi(environ, start_response)
    
    
app = Application(__name__)

@app.route('/')
def index():
    return Response('Hello World!')

@app.route('/hungry', methods=['POST'])
def hugry():
	return Response('饿了就叫饿了么')

httpd = make_server('localhost', 8010, app)
httpd.serve_forever()
复制代码
$ curl 127.0.0.1:8010/hungry -i
# HTTP/1.0 404 Not Found
# Date: Sun, 06 Jan 2019 15:09:05 GMT
# Server: WSGIServer/0.2 Python/2.7.15
# Content-Length: 0
# Content-Type: text/plain

$ curl 127.0.0.1:8010/hungry -i -XPOST -d'yes'
# HTTP/1.0 200 OK
# Date: Sun, 06 Jan 2019 15:11:15 GMT
# Server: WSGIServer/0.1 Python/2.7.15
# Content-Length: 21
# Content-Type: text/plain
#
# 饿了就叫饿了么

复制代码

到这里,我们完成了 URL 和端点的注册和路由,有了用来解析 HTTP 请求的 Request 对象,也封装了 HTTP 接口返回的 Response 对象,已经完成了 WSGI Web Framework 主干道上的功能。

当然,这里「好用」还差得远。我们还需要有合理的异常处理 (Error Handling) ,URL 重组 (URL Reconstruction) ,对线程和异步的支持,对不同平台适配文件处理 (Platform-Specific File Handling) ,支持缓冲和流 (Stream) ,最好还能携带上 Websocket 。每一项都值得仔细探讨一番,篇幅有限,本文就不赘述了。

Put them together

#! /usr/bin/env python
# coding: utf-8

import os
import sys
import functools
from wsgiref.simple_server import make_server

py3 = sys.version_info.major > 2

if py3:
    from urllib.parse import unquote as urlunquote
    urlunquote = functools.partial(urlunquote, encoding='u8')

    unicode = str
    
else:
    from urllib import unquote as urlunquote


def tobytes(s, enc='utf-8'):
    if isinstance(s, unicode):
        return s.encode(enc)
    return bytes() if s is None else bytes(s)


def tounicode(s, enc='utf-8', err='strict'):
    if isinstance(s, bytes):
        return s.decode(enc, err)
    return unicode('' if s is None else s)


tonat = tounicode if py3 else tobytes


def parse_qsl(qs):
    r = []
    for pair in qs.replace(';', '&').split('&'):
        if not pair:
            continue
        kv = urlunquote(pair.replace('+', ' ')).split('=', 1)
        if len(kv) != 2:
            kv.append('')
        r.append((kv[0], kv[1]))
    return r


def header_key(key):
    return '-'.join([word.title() for word in key.split('_')])


class Request(object):

    MAX_BUFF_SIZE = 1024 ** 2

    def __init__(self, environ=None):
        self.environ = {} if environ is None else environ
        self._body = ''

    def load(self, environ):
        self.environ = environ

    @property
    def args(self):
        return dict(parse_qsl(self.environ.get('QUERY_STRING', '')))

    @property
    def url(self):
        return self.environ['PATH_INFO']

    @property
    def method(self):
        return self.environ['REQUEST_METHOD']

    @property
    def body(self):
        return tonat(self._get_body_string())

    def _get_body_string(self):
        try:
            read_func = self.environ['wsgi.input'].read
        except KeyError:
            return self.environ['wsgi.input']
        content_length = int(self.environ.get('CONTENT_LENGTH') or 0)
        if content_length > 0:
            self._body = read_func(max(0, content_length))
            self.environ['wsgi.input'] = self._body
        return self._body


class Response(object):

    default_status = '200 OK'

    def __init__(self, body='', status=None, **headers):
        self._body = tobytes(body)
        self._status = status or self.default_status
        self._headers = {
            'Content-Type': 'text/plain',
            'Content-Length': str(len(self.body)),
        }
        if headers:
            for name, value in headers.items():
                self._headers[header_key(name)] = str(value)

    @property
    def body(self):
        return self._body

    @property
    def headerlist(self):
        return sorted(self._headers.items())

    @property
    def status_code(self):
        return int(self.status.split(' ')[0])

    @property
    def status(self):
        return self._status


request = Request()


class Application(object):

    def __init__(self, name):
        self.name = name
        self.routers = {}

    def route(self, url, methods=['GET'], view_func=None):
        def decorator(view_func):
            self.routers[url] = (methods, view_func)
        return decorator

    def _handle(self, request):
        methods, view_func = self.routers.get(request.url, (None, None))
        if methods is None or request.method not in methods:
            return Response(status='404 Not Found')
        return view_func()

    def wsgi(self, environ, start_response):
        request.load(environ)
        response = self._handle(request)
        start_response(response.status, response.headerlist)
        return [tobytes(response.body)]

    def __call__(self, environ, start_response):
        return self.wsgi(environ, start_response)


def main():
    app = Application(__name__)

    @app.route('/')
    def index():
        return Response('Hello')

    @app.route('/hungry', methods=['POST'])
    def eleme():
        if request.body == 'yes':
            return Response('饿了就叫饿了么')
        return Response('再等等')

    httpd = make_server('localhost', 8010, app)
    httpd.serve_forever()


if __name__ == '__main__':
    main()
复制代码

WSGI Server

写完了 Application 是不是还不过瘾?那我们来看看 WSGI Server 要怎么工作。

本小节主要说明 WSGI 约定下 Server 与 Application 如何协作处理 HTTP 请求,为了避免过度讨论,引入 Python 内建 HTTPServerBaseHTTPRequestHandler ,屏蔽套接字和 HTTP 处理细节。

class WSGIServer(HTTPServer):

    def __init__(self, address, app):
        HTTPServer.__init__(self, address, WSGIRequestHandler)

        self.app = app
        self.environ = {
            'SERVER_NAME': self.server_name,
            'GATEWAY_INTERFACE': 'CGI/1.0',
            'SERVER_PORT': str(self.server_port),
        }
        

class WSGIRequestHandler(BaseHTTPRequestHandler):

    def handle_one_request(self):
        try:
            # 读取 HTTP 请求数据第一行:
            # <command> <path> <version><CRLF>
            # 例如:GET /index HTTP/1.0
            self.raw_requestline = self.rfile.readline()
            if not self.raw_requestline:
                self.close_connection = 1
                return
            
            # 解析请求元数据
            elif self.parse_request():
                return self.run()
        except Exception:
            self.close_connection = 1
            raise
复制代码

这段代码就比较简单了。 WSGIServer 的主要工作就是初始化一些实例属性,其中包括注册 WSGI 应用和初始化 environ 变量。Server 接收请求后都会调用一次 RequestHandler ,同时将客户端发来的数据传入。RequestHandler 的核心方法是 handle_one_request ,负责处理每一次请求数据。

我们先来初始化请求的变量和上下文:

    def make_environ(self):
        if '?' in self.path:
            path, query = self.path.split('?', 1)
        else:
            path, query = self.path, ''
        path = urlunquote(path)
        environ = os.environ.copy()
        environ.update(self.server.environ)
        environ.update({
            # 客户端请求体句柄,可以预读
            'wsgi.input': self.rfile,
            'wsgi.errors': sys.stderr,
            
            # WSGI 版本,沿用默认 1.0
            'wsgi.version': (1, 0),
            
            # 我们的实现版本既不是多线程也不是多进程
            'wsgi.multithread': False,
            'wsgi.multiprocess': False,
            
            # 表示 server/gateway 处理请求时只调用应用一次
            # ** 这个变量我没能找到详尽的说明和具体使用的地方 **
            'wsgi.run_once': True,
            'wsgi.url_scheme': 'http'

            'SERVER_PROTOCOL': '1.0',
            'REQUEST_METHOD': self.command,
            'QUERY_STRING': query,
            'PATH_INFO': urlunquote(path),
            'CONTENT_LENGTH': self.headers.get('content-length')
        })
        return environ
复制代码

接下来我们按照 WSGI Server 侧的调用约定完成 run 方法, writestart_reponse 两个闭包分别完成数据写入和头信息设置。

    def run(self):
        # 初始化请求的上下文
        environ = self.make_environ()
        
        headers_set = []
        headers_sent = []

        def write(data):
            # 确保在写入 response body 之前头信息已经设置
            assert headers_set, 'write() before start_response()'
            
            if not headers_sent:
                status, response_headers = headers_sent[:] = headers_set
                try:
                    code, msg = status.split(' ', 1)
                except ValueError:
                    code, msg = status, ''
                code = int(code)
                self.wfile.write(tobytes('{} {} {}\r\n'.format(
                    self.protocol_version, code, msg)))
                for header in response_headers:
                    self.wfile.write(tobytes('{}: {}\r\n'.format(*header)))
                self.wfile.write(tobytes('\r\n'))
			
            # 确保 body 为字节类型
            assert isinstance(data, bytes), 'applications must write bytes'
            self.wfile.write(data)
            self.wfile.flush()

        def start_response(status, response_headers, exc_info=None):
            if exc_info:
                try:
                    # 如果头信息发送,只能重抛异常
                    if headers_sent:
                        reraise(*exc_info)
                finally:
                    # 避免 traceback 循环引用
                    exc_info = None
                    
            elif headers_set:
                raise AssertionError('Headers already set!')

            headers_set[:] = [status, response_headers]
            return write

        # 这里就是调用 WSGI 应用
        result = self.server.app(environ, start_response)
        try:
            # 循环 WSGI 应用的返回值并写入
            # 从这一步可以看出,如果应用返回的是字符串而不是列表,
            # 那么字符串内的每一个字符都会调用一次 write
            for data in result:
                if data:
                    write(data)
            if not headers_sent:
                write(tobytes(''))
        finally:
            if hasattr(result, 'close'):
                result.close()
复制代码

Put them together

#! /usr/bin/env python
# coding: utf-8

import os
import sys
import functools

py3 = sys.version_info.major > 2

if py3:
    from http.server import BaseHTTPRequestHandler, HTTPServer
    from urllib.parse import unquote as urlunquote
    urlunquote = functools.partial(urlunquote, encoding='u8')

    def reraise(*a):
        raise a[0](a[1]).with_traceback(a[2])

else:
    from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
    from urllib import unquote as urlunquote

    exec(compile('def reraise(*a): raise a[0], a[1], a[2]', '<py3fix>', 'exec'))

    
class WSGIServer(HTTPServer):

    def __init__(self, address, app):
        HTTPServer.__init__(self, address, WSGIRequestHandler)

        self.app = app
        self.environ = {
            'SERVER_NAME': self.server_name,
            'GATEWAY_INTERFACE': 'CGI/1.0',
            'SERVER_PORT': str(self.server_port),
        }


class WSGIRequestHandler(BaseHTTPRequestHandler):

    def make_environ(self):
        if '?' in self.path:
            path, query = self.path.split('?', 1)
        else:
            path, query = self.path, ''
        path = urlunquote(path)
        environ = os.environ.copy()
        environ.update(self.server.environ)
        environ.update({
            'wsgi.input': self.rfile,
            'wsgi.errors': sys.stderr,
            'wsgi.version': (1, 0),
            'wsgi.multithread': False,
            'wsgi.multiprocess': False,
            'wsgi.run_once': True,
            'wsgi.url_scheme': 'http'

            'SERVER_PROTOCOL': '1.0',
            'REQUEST_METHOD': self.command,
            'QUERY_STRING': query,
            'PATH_INFO': urlunquote(path),
            'CONTENT_LENGTH': self.headers.get('content-length')
        })
        return environ

    def run(self):
        environ = self.make_environ()
        headers_set = []
        headers_sent = []

        def write(data):
            assert headers_set, 'write() before start_response()'
            if not headers_sent:
                status, response_headers = headers_sent[:] = headers_set
                try:
                    code, msg = status.split(' ', 1)
                except ValueError:
                    code, msg = status, ''
                code = int(code)
                self.wfile.write(tobytes('{} {} {}\r\n'.format(
                    self.protocol_version, code, msg)))
                for header in response_headers:
                    self.wfile.write(tobytes('{}: {}\r\n'.format(*header)))
                self.wfile.write(tobytes('\r\n'))

            assert isinstance(data, bytes), 'applications must write bytes'
            self.wfile.write(data)
            self.wfile.flush()

        def start_response(status, response_headers, exc_info=None):
            if exc_info:
                try:
                    if headers_sent:
                        reraise(*exc_info)
                finally:
                    exc_info = None
            elif headers_set:
                raise AssertionError('Headers already set!')

            headers_set[:] = [status, response_headers]
            return write

        result = self.server.app(environ, start_response)
        try:
            for data in result:
                if data:
                    write(data)
            if not headers_sent:
                write(tobytes(''))
        finally:
            if hasattr(result, 'close'):
                result.close()

    def handle_one_request(self):
        try:
            self.raw_requestline = self.rfile.readline()
            print(self.raw_requestline)
            if not self.raw_requestline:
                self.close_connection = 1
                return
            elif self.parse_request():
                return self.run()
        except Exception:
            self.close_connection = 1
            raise
           
        
def make_server(host, port, app):
    server = WSGIServer((host, port), app)
    return server


app = Application(__name__)

@app.route('/')
def index():
	return Response('Hello')
    
httpd = make_server('localhost', 8010, app)
httpd.serve_forever()
复制代码

参考





阅读博客还不过瘾?

欢迎大家扫二维码通过添加群助手,加入交流群,讨论和博客有关的技术问题,还可以和博主有更多互动

博客转载、线下活动及合作等问题请邮件至 shadowfly_zyl@hotmail.com 进行沟通

关注下面的标签,发现更多相似文章
评论