celery使用入门

3,510 阅读9分钟

一、Celery 简介

Celery是一个专注于实时处理和任务调度的分布式任务队列。所谓任务就是消息,消息中的有效载荷中包含要执行任务需要的全部数据。

使用Celery的常见场景如下:

  1. Web应用。当用户触发的一个操作需要较长时间才能执行完成时,可以把它作为任务交给Celery去异步执行,执行完再返回给用户。这段时间用户不需要等待,提高了网站的整体吞吐量和响应时间。
  2. 定时任务。生产环境经常会跑一些定时任务。假如你有上千台的服务器、上千种任务,定时任务的管理很困难,Celery可以帮助我们快速在不同的机器设定不同种任务。
  3. 同步完成的附加工作都可以异步完成。比如发送短信/邮件、推送消息、清理/设置缓存等。

Celery还提供了如下的特性:

  1. 方便地查看定时任务的执行情况,比如执行是否成功、当前状态、执行任务花费的时间等。
  2. 可以使用功能齐备的管理后台或者命令行添加、更新、删除任务。
  3. 方便把任务和配置管理相关联。
  4. 可选多进程、Eventlet和Gevent三种模式并发执行。
  5. 提供错误处理机制。
  • 提供多种任务原语,方便实现任务分组、拆分和调用链。
  • 支持多种消息代理和存储后端。

二、Celery的架构

Celery 扮演生产者和消费者的角色

Celery包含如下组件:

  1. Celery Beat:任务调度器,Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。
  2. Celery Worker:执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率。
  3. Broker:消息代理,或者叫作消息中间件,接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(通常是消息队列或者数据库)。
  4. Producer:调用了Celery提供的API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。
  5. Result Backend:任务处理完后保存状态信息和结果,以供查询。Celery默认已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。

Celery的架构图如图所示:

入门

任务发布者 就是单一的执行任务,trigger 定时任务调用 定时执行任务 worker 就是任务,后台启动的进程,shell方式调用

产生任务的方式:
  • 发布者发布任务(WEB 应用)
  • 任务调度按期发布任务(定时任务)
celery 依赖三个库: 这三个库, 都由 Celery 的开发者开发和维护
  • billiard : 基于 Python2.7 的 multisuprocessing 而改进的库, 主要用来提高性能和稳定性.
  • librabbitmp : C 语言实现的 Python 客户端,
  • kombu : Celery 自带的用来收发消息的库, 提供了符合 Python 语言习惯的, 使用 AMQP 协议的高级借口

三、消息代理

Celery目前支持RabbitMQ、Redis、MongoDB、Beanstalk、SQLAlchemy、Zookeeper等作为消息代理,但适用于生产环境的只有RabbitMQ和Redis,至于其他的方式,一是支持有限,二是可能得不到更好的技术支持。

Celery官方推荐的是RabbitMQ,Celery的作者Ask Solem Hoel最初在VMware就是为RabbitMQ工作的,Celery最初的设计就是基于RabbitMQ,所以使用RabbitMQ会非常稳定,成功案例很多。如果使用Redis,则需要能接受发生突然断电之类的问题造成Redis突然终止后的数据丢失等后果。

四、Celery序列化

在客户端和消费者之间传输数据需要 序列化和反序列化. Celery 支出的序列化方案如下所示:

方案 说明
pickle pickle 是Python 标准库中的一个模块, 支持 Pyuthon 内置的数据结构, 但他是 Python 的专有协议. Celery 官方不推荐.
json json 支持多种语言, 可用于跨语言方案.
yaml yaml 表达能力更强, 支持的数据类型较 json 多, 但是
python 客户端的性能不如 json
msgpack 二进制的类 json 序列化方案, 但比 json 的数据结构更小, 更快.

五、安装,配置与简单示例

Celery 配置参数汇总

配置项 说明
CELERY_DEFAULT_QUEUE 默认队列
BROKER_URL 代理人即rabbitmq的网址
CELERY_BROKER_URL Broker 地址
CELERY_RESULT_BACKEND 结果存储地址
CELERY_TASK_SERIALIZER 任务序列化方式
CELERY_RESULT_SERIALIZER 任务执行结果序列化方式
CELERY_TASK_RESULT_EXPIRES 任务过期时间
CELERY_ACCEPT_CONTENT 指定任务接受的内容类型(序列化)

代码示例 :

普通脚本方式

app.py文件

from celery import Celery

app = Celery('hello', broker='redis://localhost:6379/')


@app.task
def hello():
    return 'hello world'

命令行启动

$ celery -A hello.app worker --loglevel=info

进入同级目录

(env_local) MacBook:hello clin$ python3
Python 3.6.6 (v3.6.6:4cf1f54eb7, Jun 26 2018, 19:50:54) 
[GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.57)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from app import hello
>>> hello.delay()
<AsyncResult: 3c33b8d5-31c1-49b2-9a6b-0b1f2c0c1d04>
>>> 

同样支持文件格式

项目方式

文件目录结构

├── celery_project
│   ├── app.py
│   ├── celeryconfig.py
│   └── task.py
└── celery_project-test.py

安装$ pip install celery, redis, 没有RabbitMQ ,仅使用redis,不安装 msgpack

Celery 也定义了一组用于安装 Celery 和给定特性依赖的捆绑。

你可以在 requirements.txt 中指定或在 pip 命令中使用方括号。多个捆绑用逗号分隔。

$ pip install celery[librabbitmq]
或
$ pip install celery[librabbitmq,redis,auth,msgpack]

配置文件 celeryconfig.py

BROKER_URL = 'redis://localhost:6379/1'     # 配置消息队列,默认使用 RabbitMQ
# BROKER_URL = 'amqp://dongwm:123456@localhost:5672/web_develop' # 使用RabbitMQ作为消息代理,默认地址 'amqp://guest:**@127.0.0.1:5672/'
CELERY_BROKER_URL = 'redis://localhost:6379/2'  # 把任务结果存在了Redis 区分生成的key,使用不同的库,使用 keys * 查看 # 把任务结果存在了Redis
CELERY_RESULT_BACKEND = 'redis://localhost:6379/3'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'           # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24   # 任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显
CELERY_ACCEPT_CONTENT = ["json"]            # 指定任务接受的内容类型

要验证你的配置文件可以正确工作,且不包含语法错误,你可以尝试导入它:

$ python -m celeryconfig

初始化文件 app.py ,celery 默认会到目录下找celery这个文件,尽管会引起文件名冲突,所以改名叫app

# from __future__ import absolute_import
from celery import Celery

app = Celery('celery_project', include=["celery_project.task"])
app.config_from_object("celery_project.celeryconfig")

if __name__ == "__main__":
    app.start()
    
    # When this module is executed the tasks will be named starting with “__main__”,
    # but when the module is imported by another process,
    # say to call a task, the tasks will be named starting with “tasks” (the real name of the module):
    # 当使用 __main__ 的方式导入模块,需要使用 app.worker_main() 或者 app.start() 的方式,等于导入真实的 task 下装饰的函数 add

1."from future import absolute_import"是拒绝隐式引入,因为celery.py的名字和celery的包名冲突,需要使用这条语句让程序正确地运行。因为没有使用 celery.py 命名,所以注释了 2.app是Celery类的实例,创建的时候添加了 celery_project.tasks这个模块,也就是包含了 celery_project/task.py这个文件。 3.把Celery配置存放进 celery_project/celeryconfig.py文件,使用app.config_from_object加载配置。

任务文件 task.py

# from __future__ import absolute_import
from celery_project.app import app


@app.task
def add(x, y):
    print('==========')
    return x + y    

task.py只有一个任务函数add,让它生效的最直接的方法就是添加app.task这个装饰器。

启动消费者,这个例子中没有任务调度相关的内容, 所以只需要启动消费者:

$ celery -A celery_project worker -l info # 指定项目下存在 celery.py 文件,默认启动文件
或者
$ celery -A celery_project.app worker -l info
-A参数默认会寻找proj.celery这个模块,其实使用celery作为模块文件名字不怎么合理。可以使用其他名字。举个例子,假如是proj/app.py,可以使用如下命令启动:

命令行启动输出

 -------------- celery@MacBook v4.1.0 (latentcall)
---- **** ----- 
--- * ***  * -- Darwin-19.0.0-x86_64-i386-64bit 2019-10-22 19:35:32
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         celery_project:0x10505db38
- ** ---------- .> transport:   redis://localhost:6379/1
- ** ---------- .> results:     redis://localhost:6379/3
- *** --- * --- .> concurrency: 12 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . celery_project.task.add

[2019-10-22 19:35:32,418: INFO/MainProcess] Connected to redis://localhost:6379/1
[2019-10-22 19:35:32,428: INFO/MainProcess] mingle: searching for neighbors
[2019-10-22 19:35:33,448: INFO/MainProcess] mingle: all alone
[2019-10-22 19:35:33,460: INFO/MainProcess] celery@MacBook ready.

手动触发任务,也可以在命令行执行 celery_project-test.py

from celery_project.task import add
r = add.delay(1, 3)
print(r)
print(r.result)
print(r.status)
print(r.successful())
print(r.backend)

输出结果

abb2242a-1590-4240-ab6e-40a1e834bc42
None
PENDING
False
<celery.backends.redis.RedisBackend object at 0x1112d11d0>

work进程输出日志

[2019-10-22 19:51:38,945: INFO/MainProcess] Received task: celery_project.task.add[abb2242a-1590-4240-ab6e-40a1e834bc42]  
[2019-10-22 19:51:38,948: WARNING/ForkPoolWorker-8] ==========
[2019-10-22 19:51:38,956: INFO/ForkPoolWorker-8] Task celery_project.task.add[abb2242a-1590-4240-ab6e-40a1e834bc42] succeeded in 0.008541842995327897s: 4

六、使用任务调度

调用定时任务

from datetime import timedelta
CELERYBEAT_SCHEDULE = {
    'add': {
        'task': 'celery_project.task.add',
        'schedule': timedelta(seconds=3),
        'args': (16, 16)
    }
}

CELERYBEAT_SCHEDULE中指定了task.add这个任务每3秒跑一次,执行的时候的参数是16和16。

单独启动celery

$ celery beat -A celery_project.app

celery beat输出结果

celery beat v4.1.0 (latentcall) is starting.
__    -    ... __   -        _
LocalTime -> 2019-10-22 20:36:13
Configuration ->
    . broker -> redis://localhost:6379/1
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%WARNING
    . maxinterval -> 5.00 minutes (300s)

然后再启动worker,输出结果,因为有堆积,所以一下有3个消费掉了

$ celery -A celery_project.app worker -l info
 
 -------------- celery@MacBook v4.1.0 (latentcall)
---- **** ----- 
--- * ***  * -- Darwin-19.0.0-x86_64-i386-64bit 2019-10-22 20:36:18
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         celery_project:0x105b37b00
- ** ---------- .> transport:   redis://localhost:6379/1
- ** ---------- .> results:     redis://localhost:6379/3
- *** --- * --- .> concurrency: 12 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . celery_project.task.add

[2019-10-22 20:36:18,558: INFO/MainProcess] Connected to redis://localhost:6379/1
[2019-10-22 20:36:18,567: INFO/MainProcess] mingle: searching for neighbors
[2019-10-22 20:36:19,591: INFO/MainProcess] mingle: all alone
[2019-10-22 20:36:19,602: INFO/MainProcess] celery@MacBook ready.
[2019-10-22 20:36:19,833: INFO/MainProcess] Received task: celery_project.task.add[fb82b3e6-ef18-494b-be0e-8105e9f04cd4]  
[2019-10-22 20:36:19,835: INFO/MainProcess] Received task: celery_project.task.add[ba54b389-273a-4c6b-b8dd-6c3a50522992]  
[2019-10-22 20:36:19,836: INFO/MainProcess] Received task: celery_project.task.add[c55f7140-4477-4f15-945e-2728dcd039ac]  
[2019-10-22 20:36:19,836: WARNING/ForkPoolWorker-1] ==========
[2019-10-22 20:36:19,837: WARNING/ForkPoolWorker-8] ==========
[2019-10-22 20:36:19,839: WARNING/ForkPoolWorker-2] ==========
[2019-10-22 20:36:19,846: INFO/ForkPoolWorker-1] Task celery_project.task.add[ba54b389-273a-4c6b-b8dd-6c3a50522992] succeeded in 0.00965010700019775s: 32
[2019-10-22 20:36:19,846: INFO/ForkPoolWorker-8] Task celery_project.task.add[fb82b3e6-ef18-494b-be0e-8105e9f04cd4] succeeded in 0.009201998997014016s: 32
[2019-10-22 20:36:19,847: INFO/ForkPoolWorker-2] Task celery_project.task.add[c55f7140-4477-4f15-945e-2728dcd039ac] succeeded in 0.008784970996202901s: 32
[2019-10-22 20:36:22,554: INFO/MainProcess] Received task: celery_project.task.add[7281dd7c-cf58-411e-bd8c-ac0bc31168a8]  
[2019-10-22 20:36:22,556: WARNING/ForkPoolWorker-3] ==========
[2019-10-22 20:36:22,563: INFO/ForkPoolWorker-3] Task celery_project.task.add[7281dd7c-cf58-411e-bd8c-ac0bc31168a8] succeeded in 0.008048085001064464s: 32
[2019-10-22 20:36:25,553: INFO/MainProcess] Received task: celery_project.task.add[6894cdf8-f04d-4e7d-a895-0a3181b32b37]  
[2019-10-22 20:36:25,556: WARNING/ForkPoolWorker-4] ==========
[2019-10-22 20:36:25,563: INFO/ForkPoolWorker-4] Task celery_project.task.add[6894cdf8-f04d-4e7d-a895-0a3181b32b37] succeeded in 0.008065512003668118s: 32
^C
worker: Hitting Ctrl+C again will terminate all running tasks!

worker: Warm shutdown (MainProcess)

一起启动

$ celery -B -elery_project.app worker -l info

简书参考

知乎参考