一、概念
在一个应用服务中, 对于时效性要求没那么高的业务场景,我们没必要等到所有任务执行完才返回结果, 例如用户注册场景中, 保存了用户账号密码之后. 就可以立即返回, 后续的账号激活邮件, 可以用一个种异步的形式去处理, 这种异步操作可以⽤队列服务来实现. 否则, 如果等到邮件发送成功可能⼏秒过去了.
Celery是Python语言实现的分布式队列服务, 除了支持持即时任务, 还支持定时任务, Celery有5个核心角色.https://docs.celeryproject.org/en/stable/
1.Task
任务(Task)就是你要做的事情, 例如一个注册流程里面有很多任务, 给用户发验证邮件就是一个任务, 这种耗时任务可以交给Celery去处理; 还有一种任务是定时任务, 比如每天定时统计网站的注册人数, 这个也可以交给Celery周期性的处理.
2.Broker
Broker的中文意思是经纪人, 指为市场上买卖双方提供中介服务的人. 在是Celery中它介于生产者和消费者之间经纪人, 这个角色相当于数据结构中的队列. 例如一个Web系统中, 生产者是处理核心业务的Web程序, 业务中可能会产生一些耗时的任务; 比如短信 生产者会将任务发送给Broker, 就是把这个任务暂时放到队列中, 等待消费者来处理. 消费者是Worker, 是专门用于执行任务的后台服务. Worker将实时监控队列中是否有新的任务, 如果有就拿出来进行处理. Celery本身不提供队列服务, 一般用Redis或者RabbitMQ来扮演Broker的角色.
3.Worker
Worker 就是那个一直在后台执行任务的人, 也称为任务的消费者, 它会实时地监控队列中有没有任务, 如果有就立即取出来执行.
4.Beat
Beat是一个定时任务调度器, 它会根据配置定时将任务发送给Broker, 等待Worker来消费.
5.Backend
Backend用于保存任务的执行结果, 每个任务都有返回值, 比如发送邮件的服务会告诉我们有没有发送成功, 这个结果就是存在Backend中.
二、环境搭建
Python3.6.8
通过pip 安装 celery、redis
pip install celery==3.1.26.post2
pip install redis==2.10.6
安装redis服务 redis-3.2.1.tar.gz参考:https://www.cnblogs.com/linux985/p/11344273.html
三、测试小栗子
为了测试Celery能否工作,我运行了一个最简单的任务,编写tasks.py,如下图所示
import time from celery import Celery, platforms broker = 'redis://127.0.0.1:6379' backend = 'redis://127.0.0.1:6379' platforms.C_FORCE_ROOT = True #允许在root下运行 app = Celery(__file__, broker=broker, backend=backend) @app.task def add(x, y): time.sleep(5) return x + y
上面的代码做了几件事:
创建了一个Celery实例app
指定消息中间件用redis, URL为redis://127.0.0.1:6379
指定存储用 redis, URL为redis://127.0.0.1:6379
创建了一个Celery任务add, 当函数被@app.task装饰后, 就成为可被 Celery 调度的任务。
启动Celery任务
celery worker -A tasks --loglevel=info
参数 -A 指定了Celery实例的位置, 本例是在 tasks.py 中,Celery会自动在该文件中寻找Celery对象实例
参数 --loglevel 指定了日志级别, 默认为 warning, 也可以使用 -l info 来表示。
然后看到界面显示结果如下:
我们可以看到Celery正常工作在名称centos版本为3.1.10 ,在下面的[config]中我们可以看到当前APP的名称tasks,运输工具transport就是我们在程序中设置的中间人redis://127.0.0.1:6379//,result redis://127.0.0.1:6379//,然后我们也可以看到worker缺省使用perfork来执行并发,当前并发数显示为4,然后可以看到下面的[queues]就是我们说的队列,当前默认的队列是celery,然后我们看到下面的[tasks]中有一个任务tasks.add.
了解了这些之后,根据文档我重新打开一个terminal,然后执行Python,进入Python交互界面,用delay()方法调用任务,执行如下操作:
t.ready() ##判断任务是否执行完毕
t.get() ##获取任务执行结果
可以看到, 虽然任务函数add需要等待5秒才返回执⾏结果, 但由于它是⼀个异步任务, 不会阻塞当前的主程序.
这个任务已经由之前启动的Worker异步执行了,然后我打开之前启动的worker的控制台,对输出进行查看验证,结果如下:
10:59:09这一行说明worker收到了一个任务:tasks.add,这里我们和之前发送任务返回的AsyncResult对比我们发现,每个task都有一个唯一的ID,第二行说明了这个任务执行succeed,执行结果为3。
查看资料说调用任务后会返回一个AsyncResult实例,可用于检查任务的状态,等待任务完成或获取返回值(如果任务失败,则为异常和回溯)。但这个功能默认是不开启的,需要设置一个 Celery 的结果后端(backend),这块我在下一个例子中进行了学习。
通过这个例子后我对Celery有了初步的了解,然后我在这个例子的基础上去进一步的学习。
1.2 配置文件
在上面的例子中, 我们直接把Broker和 Backend的配置写在了程序当中, 更好的做法是将配置项统一写入到一个配置文件中.
1.2.1目录结构:
celery_demo # 项⽬根⽬录
├── celery_app # 存放 celery 相关⽂件
│ ├── __init__.py
│ ├── config.py # 配置⽂件
│ ├── task1.py # 任务⽂件 1
│ └── task2.py # 任务⽂件 2
└── client.py # 应⽤程序
1.2.2 config.py文件中内容
BROKER_URL = 'redis://127.0.0.1:6379' # 指定 Broker
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 指定 Backend
CELERY_TIMEZONE='Asia/Shanghai' # 指定时区,默认是 UTC
CELERY_IMPORTS = ( # 指定导⼊的任务模块
'celery_app.task1',
'celery_app.task2'
)
1.2.3 __init__.py文件内容
# -*- coding: utf-8 -*-
from celery import Celery,platforms
platforms.C_FORCE_ROOT = True
app = Celery('demo') # 创建 Celery 实例
app.config_from_object('celery_app.config') # 通过 Celery 实例加载配置模块
1.2.4 task1.py
import time
from celery_app import app
@app.task
def add(x, y):
time.sleep(2)
return x + y
1.2.5 task2.py
from celery_app import app
import time
@app.task
def say():
time.sleep(2)
return 'helo'
1.2.6 client.py
from celery_app import task1
from celery_app import task2
task1.add.delay(1, 2)
task2.say.delay()
1.2.7 启动 celery worker
(websocket) [root@gitlab celery_demo]# celery -A celery_app worker -l info
1.2.8 执行client.py
python client.py
1.2.9 运行python client.py后它会发送两个异步任务到Broker, 在Worker的窗口我们可以看到如下输出:
[2018-10-19 06:52:31,389: INFO/MainProcess] Received task: celery_app.task1.add[b32962fe-dd61-443f-bc87-e666db957f24]
[2018-10-19 06:52:31,391: INFO/MainProcess] Received task: celery_app.task2.say[d945e419-aa93-4a2c-aec4-105f81031a64]
[2018-10-19 06:52:33,394: INFO/ForkPoolWorker-2] Task celery_app.task2.say[d945e419-aa93-4a2c-aec4-105f81031a64] succeeded in 2.00137619101s: 'helo'
[2018-10-19 06:52:33,394: INFO/ForkPoolWorker-1] Task celery_app.task1.add[b32962fe-dd61-443f-bc87-e666db957f24] succeeded in 2.00418746s: 3
1.3 定时任务
Celery 除了可以执行异步任务, 也支持执行周期性任务(Periodic Tasks),或者说定时任务, Celery Beat 进程通过读取配置文件的内容, 周期性地将定时任务发往任务队列.
1.3.1 修改配置文件, 增加定时任务
from celery.schedules import crontab
from datetime import timedelta
BROKER_URL = 'redis://127.0.0.1:6379'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379'
CELERY_TIMEZONE='Asia/Shanghai'
CELERY_IMPORTS = (
'celery_app.task1',
'celery_app.task2'
)
CELERYBEAT_SCHEDULE = {
"task1": {
"task": "celery_app.task1.add",
"schedule": timedelta(seconds=1),
"args":(1, 2),
},
"task2": {
"task": "celery_app.task2.say",
"schedule": timedelta(seconds=2),
"args":(),
},
}
1.3.2 启动celery
celery -B -A celery_app worker --loglevel=info
1.3.3 在worker窗口查看任务输出
[2018-10-19 07:40:31,966: INFO/ForkPoolWorker-2] Task celery_app.task1.add[6b216f23-036a-48ed-b14f-e252bf3f1ffb] succeeded in 2.001080302s: 3
[2018-10-19 07:40:31,968: INFO/ForkPoolWorker-3] Task celery_app.task2.say[28246df4-0eff-49f0-b468-f92b85fe97b3] succeeded in 2.00303293001s: 'helo'
##定时任务遇到的问题就是不能对托管的定时任务做动态更新, 需要重启 celery beat ..
#进阶用法 1.1 Celery异步函数回调 经过快速入门的学习后,我们已经能够使用 Celery 管理普通任务,但对于实际使用场景来说这是远远不够的,所以我们需要更深入的去了解 Celery 更多的使用方式。 首先来看之前的task: import time from celery import Celery, platforms platforms.C_FORCE_ROOT = True from celery.app.task import Task broker = 'redis://127.0.0.1:6379' backend = 'redis://127.0.0.1:6379' app = Celery(__file__, broker=broker, backend=backend) @app.task def add(x, y): time.sleep(2) return x / y 这里的装饰器app.task实际上是将一个正常的函数修饰成了一个celery task对象, 所以这里我们可以给修饰器加上参数来决定修饰后的task对象的一些属性, 我们也可以自己复写task类然后让这个自定义task修饰函数 add, 来做一些自定义操作, 比如celery修饰的函数执行成功 失败 执行完毕时的行为. import time from celery import Celery, platforms platforms.C_FORCE_ROOT = True from celery.app.task import Task broker = 'redis://127.0.0.1:6379' backend = 'redis://127.0.0.1:6379' app = Celery(__file__, broker=broker, backend=backend) class Mytask(Task): def on_success(self, retval, task_id, args, kwargs): print 'task success 11111' return super(Mytask, self).on_success(retval, task_id, args, kwargs) def on_failure(self, exc, task_id, args, kwargs, einfo): print 'task failed' return super(Mytask, self).on_failure(exc, task_id, args, kwargs, einfo) def after_return(self, status, retval, task_id, args, kwargs, einfo): print 'this is after return' return super(Mytask, self).after_return(status, retval, task_id, args, kwargs, einfo) def on_retry(self, exc, task_id, args, kwargs, einfo): print 'this is retry' return super(Mytask,self).on_retry(exc, task_id, args, kwargs, einfo) @app.task(base=Mytask) def add(x, y): time.sleep(2) return x / y celery -A tasks worker --loglevel=info ##启动任务 In [2]: t = add.delay(1, 2) ##执⾏celery函数 在worker中会有类似如下输出: [2018-10-19 15:05:18,986: WARNING/Worker-1] task success 11111 [2018-10-19 15:05:18,987: WARNING/Worker-1] this is after return [2018-10-19 15:05:18,988: INFO/MainProcess] Task task.add[c41343cf-e5ca-49f7-9478-63f4c2e2797f] succeeded in 2.01661233298s: 0 In [3]: t = add.delay(1, 0) ##执⾏celery函数, 此函数会由于函数代码中分分母0而报错. 在worker中会有类似如下输出: [2018-10-19 15:13:38,260: WARNING/Worker-2] task failed [2018-10-19 15:13:38,261: WARNING/Worker-2] this is after return [2018-10-19 15:13:38,262: ERROR/MainProcess] Task task.add[12a852cd-726e-44a0-97a8-eb17308c354e] raised unexpected: ZeroDivisionError('integer division or modulo by zero',) Traceback (most recent call last): File "/app_shell/websocket/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task R = retval = fun(*args, **kwargs) File "/app_shell/websocket/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__ return self.run(*args, **kwargs) File "/app_shell/websocket/demo/task.py", line 38, in add return x / y ZeroDivisionError: integer division or modulo by zero 1.2 将修饰函数成为Task类的绑定方法, 执行中的任务获取到了自己执行任务的各种信息, 可以根据这些信息做很多其他操作. import time from celery import Celery, platforms platforms.C_FORCE_ROOT = True from celery.app.task import Task broker = 'redis://127.0.0.1:6379' backend = 'redis://127.0.0.1:6379' app = Celery(__file__, broker=broker, backend=backend) @app.task(base=Mytask, bind=True) def add(self, x, y): print self.request time.sleep(2) return x / y celery -A tasks worker --loglevel=info ##启动任务 In [2]: t = add.delay(1, 2) 在worker中会有类似如下输出: 2019/7/14 2.Celery进阶使用 127.0.0.1:8888/notebooks/Celery/2.Celery进阶使用.ipynb 2/2 In [ ]: [2018-10-19 15:05:16,972: WARNING/Worker-1] <Context: {'chord': None, 'retries': 0, 'args': (1, 2), u'is_eager': False, u'correlation_id': u'c41343cf-e5ca-49f7-9478- 63f4c2e2797f', 'errbacks': None, 'taskset': None, 'id': 'c41343cf-e5ca-49f7-9478-63f4c2e2797f', u'headers': {}, 'called_directly': False, 'utc': True, 'task': 'task.add', u'group': None, 'callbacks': None, u'delivery_info': {u'priority': 0, u'redelivered': None, u'routing_key': u'celery', u'exchange': u'celery'}, u'hostname': 'celery@gitlab.example.com', 'expires': None, 'timelimit': (None, None), 'eta': None, 'kwargs': {}, u'reply_to': u'46af07e6-8e03-3574-9608- fc8c0b10a98e', '_protected': 1}> [2018-10-19 15:05:18,986: WARNING/Worker-1] task success 11111 [2018-10-19 15:05:18,987: WARNING/Worker-1] this is after return