『Microservices & Nameko』Python 微服务实践

10,767 阅读10分钟

1. Microservices

微服务最近一二年非常热门,谈论也比较多,简单的说,微服务将单一应用程序作为由众多小型服务构成之套件加以开发的方式,其中各项服务都拥有自己的进程并利用轻量化机制(通常为HTTP源API)实现通信。下面来一张示例图:

Microservices Architecture:

上面2幅图已经形象说明微服务是什么东西了,同时软件部署方式需要建立在容器上。微服务相关生态会在Java和Go语言中比较成熟,尤其是Java。而Python作为后端,这方面会比较弱一点,微服务框架目前能看到了也就Nameko,并且技术也没那么成熟,因目前业务Python场景比Go语言稍多,所以先来玩一下Python如何玩微服务吧。

2. Service Mesh and Serverless

关于微服务,还有2个概念也比较热,下面简单提及一下。

2.1 Service Mesh

Service Mesh 服务网格,这个概念刚开始晦涩难懂,网上也有人说事下一代微服务,简单的说,当成千上万的微服务部署在Kubernetes 上,整体来说也是相当复杂的,因为每个微服务都需要健康检查、处理错误、延时等,而Kubernetes虽然可以提供健康检查和自动恢复,但是还需要熔断模式、服务发现、API管理、加密校验等等。而这些就是 Service Mesh需要解决的问题。

更加详细的介绍: philcalcado.com/2017/08/03/…

作为服务间通信的基础设施层,可以将它比作是应用程序或者说微服务间的TCP/IP,负责服务之间的网络调用、限流、熔断和监控。对于编写应用程序来说一般无须关心TCP/IP这一层(比如通过 HTTP 协议的 RESTful 应用),同样使用Service Mesh也就无须关系服务之间的那些原来是通过应用程序或者其他框架实现的事情,比如Spring Cloud,现在只要交给Service Mesh就可以了。

Service Mesh有如下几个特点:

  • 应用程序间通讯的中间层
  • 轻量级网络代理
  • 应用程序无感知
  • 解耦应用程序的重试/超时、监控、追踪和服务发现

Service Mesh的架构如下图所示:

比较流行的开源软件有Istio,有机会再去玩一下。

2.2 Serverless

无服务架构,第一次接触是在AWS的技术峰会上,简单的说就是不需要关心服务器,整个计算堆栈,包括运行功能代码的操作系统进程,完全由云提供商管理。更加强化了 DevOps 的理念。

实际玩过AWS Lambda 无服务应用程序,确实很方便,简化为一个函数,通过 API Gateway + Lambda 则可实现Web服务。按请求量收费,这一点目前觉得很坑,尤其是请求量大时,产生的费用远远比自己将应用部署在Docker上会贵很多。 所以目前无服务架构的场景也是非常适合一些一次性任务,请求量调用不多的场景来说会非常方便,开发者成员就可以自己开发自己部署,不再需要关心服务器。 可以免除所有运维性操作,开发人员可以更加专注于核心业务的开发,实现快速上线和迭代。

3. Python framework for building microservices

3.1 Nameko Introduce

Nameko是Python中的微服务框架,git地https://github.com/nameko/nameko,受欢迎度暂时还不高,官方文档的介绍实现了:

It comes with built-in support for:

  • RPC over AMQP
  • Asynchronous events (pub-sub) over AMQP
  • Simple HTTP GET and POST
  • Websocket RPC and subscriptions (experimental)

简单的说RPC建立在AMQP上,在AMQP上实现了发布订阅,实现了简单的HTTP服务,还有Websocket RPC。

这简直跟Java的生态完全感觉是小儿科。架构通过RabbitMQ作为message broker,供给各个Nameko Service之间的通信。

更多的细节请查看官方文档。

3.2 Practice

接下来实践一下,以某一个业务场景为例。

场景: 假设社交场景中,评论别人的文章,服务器给文章作者推送一条消息告知有人评论,同时评论必须得先注册。

涉及到2个微服务,注册服务和推送服务,同时有一个评论接口。

3.2.1 环境搭建:

  • python3.5+
  • RabbitMQ
  • Redis 3.2.1
  • Nameko 2.11.0
  • Swagger
  • Flask 1.0.2

首先需要准备Python3环境,Redis简单起见作为用户登录注册的存储,Nameko用pip安装,RabbitMQ最好用Docker安装。

# RabbitMQ docker 安装命令
docker search rabbitmq
docker pull rabbitmq:3.7-rc-management
docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3.7-rc-management
# 需要默认运行在5672端口

Doc: https://github.com/docker-library/docs/tree/master/rabbitmq

# nameko 运行服务命令:
nameko run service --broker amqp://guest:guest@localhost

其中 guest:guest是RabbitMQ Docker镜像的用户名和密码

同时为了方便API测试,通过flasgger提供Swagger UI进行集成Flask。

准备好环境后,开始代码部分演示。

3.2.2 代码演示:

├── app
│   └── api.py
├── dependence
│   ├── __init__.py
│   └── services
│       ├── __init__.py
│       ├── config.py
│       └── redis_service.py
└── microservices
    ├── push.py
    └── register.py

代码结构如上:

  • app中存储的是api接口服务。
  • dependence可以理解为基础模块,可能很多的微服务都依赖的封装好的服务,比如redis,mysql的接口,一般用Git仓库的话,可以submodule到具体服务的仓库下,这里测试就全部放一个仓库了。
  • microservices 微服务代码,这里演示2个服务,注册和推送服务。

需要实践的是2个功能:

  • API代码中如何调用微服务
  • 微服务中如何调用其他微服务

先介绍一下dependence中的代码:

# content of redis_service
class RedisService(object):
    def __init__(self):
        self.redis_instance = RedisClient.get_redis(
            config.REDIS_NAME, config.REDIS_HOST, config.REDIS_PORT,
            config.REDIS_DB)
        self.users_key = "users"
        self.users_data_key = "users_data"

    def check_registered_and_get_info(self, u_id):
        """
        Check if the user is registered and return user information if registered.
        """
        user_data = self.redis_instance.hget(self.users_data_key, u_id)
        if not user_data:
            return False, None
        return True, json.loads(user_data)

    def check_email_is_registered(self, email):
        u_id = self.redis_instance.hget(self.users_key, email)
        return u_id

    def register(self, u_id, email, data):
        self.redis_instance.hset(self.users_key, email, u_id)
        result = self.redis_instance.hset(self.users_data_key, u_id,json.dumps(data))
        return result
  • check_registered_and_get_info 校验是否已经注册,如果已经注册则获取用户信息返回
  • check_email_is_registered 检查邮箱是否重复注册
  • register 注册并存储用户信息

接下来看API代码部分:

# content of api.py
import time
import random
from flask import Flask, request, jsonify
from flasgger import Swagger
from nameko.standalone.rpc import ClusterRpcProxy
import argparse

parser = argparse.ArgumentParser()
parser.add_argument("--port", help="app running port", type=int, default=5000)
parse_args = parser.parse_args()

app = Flask(__name__)
Swagger(app)

CONFIG = {'AMQP_URI': "amqp://guest:guest@localhost"}

代码校长,分开展示,这部分是引用部分,需要nameko和Swagger,nameko是提供微服务的RPC服务代理,同时需要提供CONFIG,内容是Message Broker地址,其实就RabbitMQ。 Swagger是和Flask结合,方便网页界面进行API测试。

# content of api.py
@app.route('/api/v1/comment', methods=['POST'])
def comment():
    """
    Comment API

    Parameters Explain:

        timestamp    评论时间
        u_id         用户id
        content      评论内容
        article_id   文章ID
        article_u_id 文章作者用户id
        parent_comment_id 父评论id (optional)
    ---
    parameters:
      - name: body
        in: body
        required: true
        schema:
          id: comment
          properties:
            timestamp:
              type: integer
            u_id:
              type: string
            content:
              type: string
            article_id:
              type: integer
            article_u_id:
              type: integer
            parent_comment_id:
              type: integer
    responses:
      code:
        description: 0 Comment Success!
      message:
        description: Error Message!
      data:
        description: return comment_id
    """
    data = request.json
    article_u_id = data.get("article_u_id")
    u_id = data.get("u_id")
    code, message = 0, ""
    if not article_u_id or not u_id:
        code, message = 10003, "article_u_id or u_id is null."
        response = dict(code=code, message=message, data="")
        return jsonify(response)
    with ClusterRpcProxy(CONFIG) as rpc:
        user_data = rpc.register.check_registered(u_id)
        if not user_data:
            code, message = 10004, "You need to register to comment."
            response = dict(code=code, message=message, data="")
            return jsonify(response)

        # push message
        print("Push Message: article_u_id: {}".format(article_u_id))
        result, message = rpc.push.push(article_u_id, data.get("content"))
        print("push result: {}, message: {}".format(result, message))

    # save comment data
    print("Save Comment Data: article_id: {} content: {}".format(
        data.get("article_id"), data.get("content")))

    data = dict(comment_id=int(time.time()))
    response = dict(code=0, message="", data=data)
    return jsonify(response)

评论接口,描述部分是提供Swagger的API接口描述(其规范需要遵循Swagger规范,具体可以查看官方文档),提供评论者的用户ID,文章ID,评论的内容,文章作者用户ID(简单起见,直接客户端提供,正常场景是根据文章ID找到作者的用户ID)。

实现的功能也非常简单,先通过调用检查注册服务看评论者是否有注册,没有就直接返回需要注册才能评论。如果已经注册,则调用推送服务给作者进行推送通知。之后并保存评论信息,返回评论ID。

关键信息就是在 注册和推送 微服务的实现,保存评论信息,我这里直接print,没有做实际的操作。

@app.route('/api/v1/register', methods=['POST'])
def register():
    """
    Register API

    Parameters Explain:

        timestamp    注册时间
        email        注册邮箱
        name         名称
        language     语言
        country      国家
    ---
    parameters:
      - name: body
        in: body
        required: true
        schema:
          id: data
          properties:
            timestamp:
              type: integer
            email:
              type: string
            name:
              type: string
            language:
              type: string
            country:
              type: string
    responses:
      code:
        description: 0 register success.
      message:
        description: Error Message!
      data:
          description: return u_id
    """

    user_data = request.json
    email = user_data.get("email")
    code, message = 0, ""
    if not email:
        code, message = 10000, "email is null."
        response = dict(code=code, message=message, data="")
        return jsonify(response)
    u_id = None
    with ClusterRpcProxy(CONFIG) as rpc:
        u_id, message = rpc.register.register(email, user_data)
    if message:
        code = 10001
    data = dict(u_id=u_id)
    response = dict(code=code, message=message, data=data)
    return jsonify(response)


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=int(parse_args.port), debug=True)

这是api.py最后一段,实现的是注册接口,简单的说就是调用注册服务,如果已经注册则直接返回,否则存储用户信息。

关键在于注册服务的实现。

运行 python api.py 打开 http://localhost:5000/apidocs/可以看到如下界面:

点开其中一个API,可以看到如下界面:

非常方便进行API接口调试。

接下来重点来了,演示微服务代码部分:

import random
from nameko.rpc import rpc
import sys
sys.path.append("..")
from dependence.services import RedisService

class RegisterService(object):
    name = "register"

    def __init__(self):
        self.redis_handle = RedisService()

    @rpc
    def check_registered(self, u_id):
        is_registered, user_data =   self.redis_handle.check_registered_and_get_info(u_id)
        if is_registered:
            return user_data
        return None

    @staticmethod
    def generate_u_id():
        """
        Test Function
        """
        return str(random.randint(7000000, 9999999))

    @rpc
    def register(self, email, user_data):
        u_id = self.redis_handle.check_email_is_registered(email)
        if u_id:
            return u_id, "already registered."
        u_id = self.generate_u_id()
        register_result = self.redis_handle.register(u_id, email, user_data)
        if register_result:
            return u_id, ""
        return None, "register failed.

关注register的实现,需要导入nameko.rpc,并且用rpc装饰该函数。实现非常简单,里面代码就是逻辑部分,生成u_id,然后存储到redis中。

这样就实践了第一个功能,在API中调用微服务。

接下来看看 push 服务的实现:

import random
from nameko.rpc import rpc, RpcProxy
import sys
sys.path.append("..")
from dependence.services import RedisService

class PushService(object):
    name = "push"
    register_rpc = RpcProxy("register")

    @rpc
    def push(self, u_id, content):
        user_data = self.register_rpc.check_registered(u_id)
        if not user_data:
        print("User:{} not existed.".format(u_id))
            return False, "not registered."
        language, country = user_data["language"], user_data["country"]

        # get language push content
        print("Push Progress: u_id: {} language: {}, country: {}, content: {}".
              format(u_id, language, country, content))

        return True, "push success."

push服务中需要调用注册服务,判断文章作者是否注册(其实能够发表文章肯定是已经注册,这里是指演示),这样就微服务中调用微服务,需要额外import RpcProxy,指定 注册服务 RpcProxy("register"),然后再服务中调用即可,并且拿到用户的信息,判断语言和国家,推送对应的语言内容。

整体来讲,Nameko这个框架,代码层实现非常简单,轻量级,简单实用。但是功能不全,Python 后端应用场景不多。

3.2.3 调试

开三个终端,分别运行:

cd microservices & nameko run push
cd microservices & nameko run register

cd app & python api.py

打开http://localhost:5000/apidocs/#/

准备注册数据:

注册信息1
{
  "country": "CN",
  "email": "nameko@nameko.com",
  "language": "ZH",
  "name": "xiaohua",
  "timestamp": 1553652949
}

注册信息2
{
  "country": "CN",
  "email": "nameko2@nameko.com",
  "language": "ZH",
  "name": "xiaoming",
  "timestamp": 1553652950
}

操作示例:

返回信息:

返回信息1
{
  "code": 0,
  "data": {
    "u_id": "7434029"
  },
  "message": ""
}

返回信息2
{
  "code": 0,
  "data": {
    "u_id": "8240184"
  },
  "message": ""
}

调用push接口:

点击执行execute。

返回信息:

python api.py 终端打印信息:

Push Message: article_u_id: 7434029
push result: True, message: push success.
Save Comment Data: article_id: 100 content: very good.
127.0.0.1 - - [27/Mar/2019 23:24:07] "POST /api/v1/comment HTTP/1.1" 200 -

Python 微服务就演示到这里,完整代码 github 地址github.com/CrystalSkyZ…

下一篇聊一下RPC。

更多精彩文章,请关注公众号『天澄技术杂谈』