Celery定时任务

2,515 阅读1分钟
原文链接: www.cnblogs.com

celery支持定时任务,设定好任务的执行时间,celery就会定时自动执行, 这个定时任务模块叫celery beat

 

示例

#cat test.py

from celery import Celery
from celery.schedules import crontab

app = Celery(
            broker='redis://10.10.10.11',
            backend='redis://10.10.10.11'
)
@app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): sender.add_periodic_task(10.0, test_celery.s('hello'), name='add every 10') #每10秒  test_celery为下面定义的函数,‘hello’为传入的参数 sender.add_periodic_task(30.0, test_celery.s('world'), expires=10) #每30秒发送一次 sender.add_periodic_task( crontab(hour=13, minute=50, day_of_week=3), #每周一7点半 test_celery.s('Happy Mondays!'), ) @app.task def test_celery(arg): print(arg)

 

启动监听和worker

D:\django-project\wechat>celery -A test  beat            #启动任务调度器 celery beat,test为要执行的文件名
D:\django-project\wechat>celery -A test  worker -P eventlet    #启动worker,执行调度器发起的任务

 

使用crontab做定时任务

from celery.schedules import crontab

app.conf.beat_schedule = {

    'add-every-monday-morning': {
        'task': 'tasks.add',                        #task.add (为task.py下的add函数)
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (16, 16),                          #args 为传入的参数
    },
}

 

示例

from celery import Celery
import subprocess

from celery.schedules import crontab

app = Celery("test",
            broker='redis://10.10.10.11',
            backend='redis://10.10.10.11'
)

app.conf.beat_schedule = {

    # Executes every Monday morning at 7:30 a.m.

    'add-every-monday-morning': {

        'task': 'test.cmd_run',

        'schedule': crontab(hour=7, minute=30, day_of_week=1),

        'args': ('ipconfig',),

    },

}
app.conf.timezone = 'UTC'

@app.task
def cmd_run(cmd):
    result = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
    return result.stdout.read().decode("utf-8")

 

重启celery和监听