celery支持定时任务,设定好任务的执行时间,celery就会定时自动执行, 这个定时任务模块叫celery beat
示例
#cat test.py
from celery import Celery
from celery.schedules import crontab
app = Celery(
broker='redis://10.10.10.11',
backend='redis://10.10.10.11'
)
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(10.0, test_celery.s('hello'), name='add every 10') #每10秒 test_celery为下面定义的函数,‘hello’为传入的参数
sender.add_periodic_task(30.0, test_celery.s('world'), expires=10) #每30秒发送一次
sender.add_periodic_task(
crontab(hour=13, minute=50, day_of_week=3), #每周一7点半
test_celery.s('Happy Mondays!'),
)
@app.task
def test_celery(arg):
print(arg)
启动监听和worker
D:\django-project\wechat>celery -A test beat #启动任务调度器 celery beat,test为要执行的文件名
D:\django-project\wechat>celery -A test worker -P eventlet #启动worker,执行调度器发起的任务
使用crontab做定时任务
from celery.schedules import crontab
app.conf.beat_schedule = {
'add-every-monday-morning': {
'task': 'tasks.add', #task.add (为task.py下的add函数)
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (16, 16), #args 为传入的参数
},
}
示例
from celery import Celery
import subprocess
from celery.schedules import crontab
app = Celery("test",
broker='redis://10.10.10.11',
backend='redis://10.10.10.11'
)
app.conf.beat_schedule = {
# Executes every Monday morning at 7:30 a.m.
'add-every-monday-morning': {
'task': 'test.cmd_run',
'schedule': crontab(hour
=
7
, minute
=
30
, day_of_week
=
1
)
,
'args': ('ipconfig',),
},
}
app.conf.timezone = 'UTC'
@app.task
def cmd_run(cmd):
result = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
return result.stdout.read().decode("utf-8")
重启celery和监听