动手实现一个简单的Celery

2,998 阅读10分钟

简介

Celery是一个由Python实现的分布式任务队列,任务队列通常有3个方面的功能。

  • 1.减缓高并发压力,先将任务写入队列,有空余资源再运行
  • 2.执行定时任务,先将任务写入队列,指定时间下再执行
  • 3.异步任务,web中存在耗时任务可以先将其写入队列,然后后台任务进程去执行

已经有很多文章来描述Celery的用法与简单原理,本篇文章也会简单提及,但不会费太多笔墨。

本篇重点在于,利用Python动手实现一个简单的Celery,并使用自己实现的Celery实现异步任务,与上一篇「Python Web:Flask异步执行任务」一样,通过Flask构建一个简单的web,然后执行耗时任务,希望前端可以通过进度条显示任务的进度。

需注意,这里不会去解读Celery的源码,其源码具有很多工程细节,比较复杂,这里只是从其本质出发,简单的实现一个玩具Celery,这个玩具Celery在稳定性、效率等方面当然不能与Celery相比,但可以很好的理解Celery大体是怎么实现的。

本文讲究的是「形离神合」,与Celery实现细节不同,但本质原理相同。

那我们开始吧!

Celery的概念与原理

Celery 5 个关键的概念,弄明白,就大致理解 Celery 了。

  • 1.Task(任务) 简单而言就是你要做的事情,如用户注册流程中的发送邮件

  • 2.Worker(工作者) 在后台处理Task的人

  • 3.Broker(经纪人) 本质是一种队列,Task 会交给 Broker ,Worker 会从 Broker 中取 Task ,并处理

  • 4.Beat 定时任务调度器,根据定的时间,向 Broker 中添加数据,然后等待 Worker 去处理

  • 5.Backend 用于保存 Worker 执行结果的对象,每个 Task 都要有返回值,这些返回值,就在 Backend 中

这里我们抛开这里的各种概念,从更本质的角度来看Celery,发现它就一个任务序列化存储与反序列化获取的过程。

以Web异步任务为例,使用方式通常为:

  • 1.有一个要长时间处理I/O的函数,如果不将其异步执行就会产生的阻塞,这通常是不被允许的
  • 2.启动一个后台任务执行进程
  • 3.当要执行耗时函数时,不会立刻同步运行,而是提取函数的关键数据,将其序列化存储到队列中,队列可以使用Redis或其他方式实现
  • 4.后台任务执行进程会从队列中获取数据,并将其反序列化还原
  • 5.后台任务执行进程会使用原来的函数以及还原的数据完成函数的执行,从而实现异步执行的效果。

流程并不复杂,Celery中不同的概念分别负责上面流程中的不同部分。

实现一个简单的Celery

接着我们来实现一个Celery,这里Celery选择Redis作为后端。

先来整理一个大体的框架。

首先我们需要定义一个Task类来表示要执行的任务,不同的任务要执行的具体逻辑由使用者自身编写。

接着要定义一个任务队列,即Celery中的Broker,用于存储要执行的任务。

随后要定义执行进程Worker,Worker要从Broker中获取任务并去执行。

最后还需要定义一个用于存储任务返回数据的类,即Celery中的Backend。

看上去有点复杂,不慌,其实很简单。

实现任务类

首先来实现task.py,用于定义任务相关的一些逻辑

# task.py
import abc
import json
import uuid
import traceback
import pickle

from broker import Broker
from backend import Backend

class BaseTask(abc.ABC):
    """
    Example Usage:
        class AdderTask(BaseTask):
            task_name = "AdderTask"
            def run(self, a, b):
                result = a + b
                return result
        adder = AdderTask()
        adder.delay(9, 34)
    """

    task_name = None

    def __init__(self):
        if not self.task_name:
            raise ValueError("task_name should be set")
        self.broker = Broker()
 
    @abc.abstractmethod # abstractmethod 派生类必须重写实现逻辑
    def run(self, *args, **kwargs):
        # 写上你具体的逻辑
        raise NotImplementedError("Task `run` method must be implemented.")

    # 更新任务状态
    def update_state(self, task_id, state, meta={}):
        _task = {"state": state, "meta": meta}
        serialized_task = json.dumps(_task)
        backend = Backend()
        backend.enqueue(queue_name=task_id, item=serialized_task)
        print(f"task info: {task_id} succesfully queued")

    # 异步执行
    def delay(self, *args, **kwargs):
        try:
            self.task_id = str(uuid.uuid4())
            _task = {"task_id": self.task_id, "args": args, "kwargs": kwargs}
            serialized_task = json.dumps(_task)
            # 加入redis中
            self.broker.enqueue(queue_name=self.task_name, item=serialized_task)
            print(f"task: {self.task_id} succesfully queued")
        except Exception:
            # traceback.print_exc()
            raise Exception("Unable to publish task to the broker.")
        return self.task_id

# 获取数据
def async_result(task_id):
    backend = Backend()
    _dequeued_item = backend.dequeue(queue_name=task_id)
    dequeued_item = json.loads(_dequeued_item)
    state = dequeued_item["state"]
    meta = dequeued_item["meta"]
    class Info():
        def __init__(self, state, meta):
            self.state = state
            self.meta = meta
    info = Info(state, meta)
    return info

上述代码中,定义了BaseTask类,它继承自python的abc.ABC成为一个抽象基类,其中一开始便要求必须定义task_name,这是因为后面我们需要通过task_name去找对应的任务队列。

BaseTask类的run()方法被abc.abstractmethod装饰,该装饰器会要求BaseTask的派生类必须重写run()方法,这是为了让使用者可以自定义自己的任务逻辑。

BaseTask类的update_state()方法用于更新任务的状态,其逻辑很简单,先将参数进行JSON序列化,然后调用Backend的enqueue()方法将数据存入,这里的Backend其实是Redis实例,enqueue()方法会将数据写入Redis的list中,需要注意,这里list的key为task_id,即当前任务的id。

BaseTask类的delay()方法用于异步执行任务,首先通过uuid为任务创建一个唯一id,然后将方法的参数通过JSON序列化,然后调用Broker的enqueue()将数据存入,这里的Broker其实也是一个Redis实例,enqueue()方法同样是将数据写入到Redis的list中,只是list的key为task_name,即当前任务的名称。

此外还实现了async_result()方法,该方法用于异步获取任务的数据,通过该方法可以获得任务的执行结果,或任务执行中的各种数据,数据的结构是有简单约定的,必须要有state表示当然任务的状态,必须要有meta表示当前任务的一些信息。

实现Broker与Backend

在task.py中使用了Broker与Backend,那接着就来实现一下这两个,先实现Broker。

# broker.py
import redis # pip install redis

class Broker:
    """
    use redis as our broker.
    This implements a basic FIFO queue using redis.
    """
    def __init__(self):
        host = "localhost"
        port = 6379
        password = None
        self.redis_instance = redis.StrictRedis(
            host=host, port=port, password=password, db=0, socket_timeout=8.0
        )

    def enqueue(self, item, queue_name):
        self.redis_instance.lpush(queue_name, item)

    def dequeue(self, queue_name):
        dequed_item = self.redis_instance.brpop(queue_name, timeout=3)
        if not dequed_item:
            return None
        dequed_item = dequed_item[1]
        return dequed_item

没什么可讲的,就是定了两个方法用于数据的存储与读取,存储使用lpush方法,它会将数据从左边插入到Redis的list中,读取数据使用brpop方法,它会从list的右边取出第一个元素,返回该元素值并从list删除,左进右出就构成了一个队列。

为了简便,Backend的代码与Broker一模一样,只是用来存储任务的信息而已,代码就不贴了。

后台任务执行进程Worker

接着来实现后台任务执行进程Worker

# worker.py
import json

class Worker:
    """
    Example Usage:
        task = AdderTask()
        worker = Worker(task=task)
        worker.start()
    """
    def __init__(self, task) -> None:
        self.task = task

    def start(self,):
        while True:
            try:
                _dequeued_item = self.task.broker.dequeue(queue_name=self.task.task_name)
                dequeued_item = json.loads(_dequeued_item)
                task_id = dequeued_item["task_id"]
                task_args = dequeued_item["args"]
                task_kwargs = dequeued_item["kwargs"]
                task_kwargs['task_id'] = task_id
                self.task.run(*task_args, **task_kwargs)
                print("succesful run of task: {0}".format(task_id))
            except Exception:
                print("Unable to execute task.")
                continue   

上述代码中,定义了Worker类,Worker类在初始化时需要指定具体的任务实例,然后从broker中获取任务相关的数据,接着调用其中的run()方法完成任务的执行,比较简单。

使用玩具Celery

玩具Celery的关键结构都定义好了,接着就来使用一下它,这里依旧会使用「Python Web:Flask异步执行任务」文章中的部分代码,如前端代码,这里也不再讨论其前端代码,没有阅读可以先阅读一下,方便理解下面的内容。

首先定义出一个耗时任务

# app.py
class LongTask(BaseTask):

    task_name = "LongTask"

    def run(self, task_id):
        """Background task that runs a long function with progress reports."""
        verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']
        adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast']
        noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']

        message = ''
        total = random.randint(10, 50)

        for i in range(total):
            if not message or random.random() < 0.25:
                message = '{0} {1} {2}...'.format(random.choice(verb),
                                                  random.choice(adjective),
                                                  random.choice(noun))
            self.update_state(task_id=task_id, state='PROGRESS',
                              meta={'current': i, 'total': total,
                                    'status': message})
            time.sleep(1)
        
        self.update_state(task_id=task_id, state='FINISH', meta={'current':100, 'total': 100,'status': 'Task completed!', 'result':32})
        return

每个耗时任务都要继承在BaseTask并且重写其run()方法,run()方法中的逻辑就是当前这个耗时任务要处理的具体逻辑。

这里逻辑其实很简单,就是随机的从几个列表中抽取词汇而已。

在for迭代中,想要前端知道当前任务for迭代的具体情况,就需要将相应的数据通过BaseTask的update_state()方法将其更新到backend中,使用task_id作为Redis中list的key。

当逻辑全部执行完后,将状态为FINISH的信息存入backend中。

写一个接口来触发这个耗时任务

# app.py
@app.route('/longtask', methods=['POST'])
def longtask():
    long_task = LongTask()
    task_id = long_task.delay()
    return jsonify({}), 202, {'Location': url_for('taskstatus',
                                                  task_id=task_id)}

逻辑非常简单,实例化LongTask(),并调用其中的delay()方法,该方法会将当前任务存入认为队列中,当前的请求会将当前任务的task_id通过响应包头的中的taskstatus字段传递给前端。

前端获取到后,就可以通过task_id去获取当前任务执行状态等信息,从而实现前端的可视化。

接着定义相应的接口来获取当前任务的信息,调用用async_result()方法,将task_id传入则可。

# app.py
@app.route('/status/<task_id>')
def taskstatus(task_id):
    info = async_result(task_id)
    print(info)
    if info.state == 'PENDING':
        response = {
            'state': info.state,
            'current': 0,
            'total': 1,
            'status': 'Pending...'
        }
    elif info.state != 'FAILURE':
        response = {
            'state': info.state,
            'current': info.meta.get('current', 0),
            'total': info.meta.get('total', 1),
            'status': info.meta.get('status', '')
        }
        if 'result' in info.meta:
            response['result'] = info.meta['result']
    else:
        # something went wrong in the background job
        response = {
            'state': info.state,
            'current': 1,
            'total': 1,
            'status': str(info.meta),  # this is the exception raised
        }
    return jsonify(response)

最后,需要定义一个启动后台任务执行进程的逻辑

# run_worker.py
from worker import Worker
from app import LongTask

if __name__ == "__main__":
    long_task = LongTask()
    worker = Worker(task=long_task)
    worker.start()

至此,整体结构就构建完了,使用一下。

首先运行redis。

redis-server

然后运行Flask。

python app.py

最后启动一下后台任务执行进程,它相当于Celery的celery -A xxx worker --loglevel=info命令。

python run_worker.py

同时执行多个任务,效果如下

对应的一些打印如下:

python run_worker.py
Unable to execute task.
Unable to execute task.
Unable to execute task.
task info: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
task info: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
task info: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
task info: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
python app.py
 * Serving Flask app "app" (lazy loading)
 * Environment: production
   WARNING: Do not use the development server in a production environment.
   Use a production WSGI server instead.
 * Debug mode: on
 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
 * Restarting with stat

 * Debugger is active!
 * Debugger PIN: 145-285-706
127.0.0.1 - - [25/Sep/2019 11:14:07] "GET / HTTP/1.1" 200 -
task: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
127.0.0.1 - - [25/Sep/2019 11:14:11] "POST /longtask HTTP/1.1" 202 -
<task.async_result.<locals>.Info object at 0x107f50780>
127.0.0.1 - - [25/Sep/2019 11:14:11] "GET /status/3c7cd8ac-7482-467b-b17c-dba2649b70ee HTTP/1.1" 200 -
<task.async_result.<locals>.Info object at 0x107f50a20>
127.0.0.1 - - [25/Sep/2019 11:14:13] "GET /status/3c7cd8ac-7482-467b-b17c-dba2649b70ee HTTP/1.1" 200 -

需要注意一些,上面的代码中,使用Worker需要实例化具体的任务,此时任务实例与app.py中通过接口创建的任务实例是不同的,Worker利用不同的实例,使用相同的参数,从而实现执行效果相同的目的。

代码已上传Githu:github.com/ayuLiao/toy…

如果你觉得文章有帮助,请按一下右下角的「在看」小星星,那是可以按的,谢谢。