一、概念

在一个应用服务中, 对于时效性要求没那么高的业务场景,我们没必要等到所有任务执行完才返回结果, 例如用户注册场景中, 保存了用户账号密码之后. 就可以立即返回, 后续的账号激活邮件, 可以用一个种异步的形式去处理, 这种异步操作可以⽤队列服务来实现. 否则, 如果等到邮件发送成功可能⼏秒过去了.

Celery是Python语言实现的分布式队列服务, 除了支持持即时任务, 还支持定时任务, Celery有5个核心角色.https://docs.celeryproject.org/en/stable/

1.Task

任务(Task)就是你要做的事情, 例如一个注册流程里面有很多任务, 给用户发验证邮件就是一个任务, 这种耗时任务可以交给Celery去处理; 还有一种任务是定时任务, 比如每天定时统计网站的注册人数, 这个也可以交给Celery周期性的处理.

2.Broker

Broker的中文意思是经纪人, 指为市场上买卖双方提供中介服务的人. 在是Celery中它介于生产者和消费者之间经纪人, 这个角色相当于数据结构中的队列. 例如一个Web系统中, 生产者是处理核心业务的Web程序, 业务中可能会产生一些耗时的任务; 比如短信 生产者会将任务发送给Broker, 就是把这个任务暂时放到队列中, 等待消费者来处理. 消费者是Worker, 是专门用于执行任务的后台服务. Worker将实时监控队列中是否有新的任务, 如果有就拿出来进行处理. Celery本身不提供队列服务, 一般用Redis或者RabbitMQ来扮演Broker的角色.

3.Worker

Worker 就是那个一直在后台执行任务的人, 也称为任务的消费者, 它会实时地监控队列中有没有任务, 如果有就立即取出来执行.

4.Beat

Beat是一个定时任务调度器, 它会根据配置定时将任务发送给Broker, 等待Worker来消费.

5.Backend

Backend用于保存任务的执行结果, 每个任务都有返回值, 比如发送邮件的服务会告诉我们有没有发送成功, 这个结果就是存在Backend中.

          Celery基本使用--->django-celery实现异步HTTP请求

 

 

二、环境搭建

Python3.6.8

通过pip 安装 celery、redis
pip install celery==3.1.26.post2
pip install redis==2.10.6
安装redis服务 redis-3.2.1.tar.gz参考:https://www.cnblogs.com/linux985/p/11344273.html

 

三、测试小栗子

为了测试Celery能否工作,我运行了一个最简单的任务,编写tasks.py,如下图所示

import time
from celery import Celery, platforms
broker = 'redis://127.0.0.1:6379'
backend = 'redis://127.0.0.1:6379'
platforms.C_FORCE_ROOT = True #允许在root下运行
app = Celery(__file__, broker=broker, backend=backend)
@app.task
def add(x, y):
    time.sleep(5)
    return x + y

上面的代码做了几件事:
创建了一个Celery实例app
指定消息中间件用redis, URL为redis://127.0.0.1:6379
指定存储用 redis, URL为redis://127.0.0.1:6379
创建了一个Celery任务add, 当函数被@app.task装饰后, 就成为可被 Celery 调度的任务。

 

启动Celery任务
celery worker -A tasks --loglevel=info
参数 -A 指定了Celery实例的位置, 本例是在 tasks.py 中,Celery会自动在该文件中寻找Celery对象实例
参数 --loglevel 指定了日志级别, 默认为 warning, 也可以使用 -l info 来表示。

然后看到界面显示结果如下:

Celery基本使用--->django-celery实现异步HTTP请求

我们可以看到Celery正常工作在名称centos版本为3.1.10 ,在下面的[config]中我们可以看到当前APP的名称tasks,运输工具transport就是我们在程序中设置的中间人redis://127.0.0.1:6379//,result redis://127.0.0.1:6379//,然后我们也可以看到worker缺省使用perfork来执行并发,当前并发数显示为4,然后可以看到下面的[queues]就是我们说的队列,当前默认的队列是celery,然后我们看到下面的[tasks]中有一个任务tasks.add.

了解了这些之后,根据文档我重新打开一个terminal,然后执行Python,进入Python交互界面,用delay()方法调用任务,执行如下操作:

Celery基本使用--->django-celery实现异步HTTP请求

 t.ready() ##判断任务是否执行完毕

 t.get() ##获取任务执行结果

可以看到, 虽然任务函数add需要等待5秒才返回执⾏结果, 但由于它是⼀个异步任务, 不会阻塞当前的主程序.

 

这个任务已经由之前启动的Worker异步执行了,然后我打开之前启动的worker的控制台,对输出进行查看验证,结果如下:

Celery基本使用--->django-celery实现异步HTTP请求

10:59:09这一行说明worker收到了一个任务:tasks.add,这里我们和之前发送任务返回的AsyncResult对比我们发现,每个task都有一个唯一的ID,第二行说明了这个任务执行succeed,执行结果为3。

查看资料说调用任务后会返回一个AsyncResult实例,可用于检查任务的状态,等待任务完成或获取返回值(如果任务失败,则为异常和回溯)。但这个功能默认是不开启的,需要设置一个 Celery 的结果后端(backend),这块我在下一个例子中进行了学习。

通过这个例子后我对Celery有了初步的了解,然后我在这个例子的基础上去进一步的学习。

1.2 配置文件
在上面的例子中, 我们直接把Broker和 Backend的配置写在了程序当中, 更好的做法是将配置项统一写入到一个配置文件中.

1.2.1目录结构:
celery_demo # 项⽬根⽬录
├── celery_app # 存放 celery 相关⽂件
│ ├── __init__.py
│ ├── config.py # 配置⽂件
│ ├── task1.py # 任务⽂件 1
│ └── task2.py # 任务⽂件 2
└── client.py # 应⽤程序

1.2.2 config.py文件中内容
BROKER_URL = 'redis://127.0.0.1:6379' # 指定 Broker
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 指定 Backend
CELERY_TIMEZONE='Asia/Shanghai' # 指定时区,默认是 UTC
CELERY_IMPORTS = ( # 指定导⼊的任务模块
'celery_app.task1',
'celery_app.task2'
)

1.2.3 __init__.py文件内容
# -*- coding: utf-8 -*-
from celery import Celery,platforms
platforms.C_FORCE_ROOT = True
app = Celery('demo') # 创建 Celery 实例
app.config_from_object('celery_app.config') # 通过 Celery 实例加载配置模块

1.2.4 task1.py
import time
from celery_app import app
@app.task
def add(x, y):
    time.sleep(2)
    return x + y

1.2.5 task2.py
from celery_app import app
import time
@app.task
def say():
    time.sleep(2)
    return 'helo'
    
1.2.6 client.py
from celery_app import task1
from celery_app import task2
task1.add.delay(1, 2)
task2.say.delay()

1.2.7 启动 celery worker
(websocket) [root@gitlab celery_demo]# celery -A celery_app worker -l info

1.2.8 执行client.py
python client.py

1.2.9 运行python client.py后它会发送两个异步任务到Broker, 在Worker的窗口我们可以看到如下输出:
[2018-10-19 06:52:31,389: INFO/MainProcess] Received task: celery_app.task1.add[b32962fe-dd61-443f-bc87-e666db957f24]
[2018-10-19 06:52:31,391: INFO/MainProcess] Received task: celery_app.task2.say[d945e419-aa93-4a2c-aec4-105f81031a64]
[2018-10-19 06:52:33,394: INFO/ForkPoolWorker-2] Task celery_app.task2.say[d945e419-aa93-4a2c-aec4-105f81031a64] succeeded in 2.00137619101s: 'helo'
[2018-10-19 06:52:33,394: INFO/ForkPoolWorker-1] Task celery_app.task1.add[b32962fe-dd61-443f-bc87-e666db957f24] succeeded in 2.00418746s: 3

  

 

1.3 定时任务
Celery 除了可以执行异步任务, 也支持执行周期性任务(Periodic Tasks),或者说定时任务, Celery Beat 进程通过读取配置文件的内容, 周期性地将定时任务发往任务队列.

1.3.1 修改配置文件, 增加定时任务
from celery.schedules import crontab
from datetime import timedelta
BROKER_URL = 'redis://127.0.0.1:6379'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379'
CELERY_TIMEZONE='Asia/Shanghai'
CELERY_IMPORTS = (
'celery_app.task1',
'celery_app.task2'
)
CELERYBEAT_SCHEDULE = {
"task1": {
"task": "celery_app.task1.add",
"schedule": timedelta(seconds=1),
"args":(1, 2),
},
"task2": {
"task": "celery_app.task2.say",
"schedule": timedelta(seconds=2),
"args":(),
},
}

1.3.2 启动celery
celery -B -A celery_app worker --loglevel=info

1.3.3 在worker窗口查看任务输出
[2018-10-19 07:40:31,966: INFO/ForkPoolWorker-2] Task celery_app.task1.add[6b216f23-036a-48ed-b14f-e252bf3f1ffb] succeeded in 2.001080302s: 3
[2018-10-19 07:40:31,968: INFO/ForkPoolWorker-3] Task celery_app.task2.say[28246df4-0eff-49f0-b468-f92b85fe97b3] succeeded in 2.00303293001s: 'helo'
##定时任务遇到的问题就是不能对托管的定时任务做动态更新, 需要重启 celery beat ..

  

 

Celery基本使用--->django-celery实现异步HTTP请求
#进阶用法
1.1 Celery异步函数回调
经过快速入门的学习后,我们已经能够使用 Celery 管理普通任务,但对于实际使用场景来说这是远远不够的,所以我们需要更深入的去了解 Celery 更多的使用方式。
首先来看之前的task:
import time
from celery import Celery, platforms
platforms.C_FORCE_ROOT = True
from celery.app.task import Task
broker = 'redis://127.0.0.1:6379'
backend = 'redis://127.0.0.1:6379'
app = Celery(__file__, broker=broker, backend=backend)
@app.task
def add(x, y):
    time.sleep(2)
    return x / y
这里的装饰器app.task实际上是将一个正常的函数修饰成了一个celery task对象, 所以这里我们可以给修饰器加上参数来决定修饰后的task对象的一些属性, 我们也可以自己复写task类然后让这个自定义task修饰函数
add, 来做一些自定义操作, 比如celery修饰的函数执行成功 失败 执行完毕时的行为.
import time
from celery import Celery, platforms
platforms.C_FORCE_ROOT = True
from celery.app.task import Task
broker = 'redis://127.0.0.1:6379'
backend = 'redis://127.0.0.1:6379'
app = Celery(__file__, broker=broker, backend=backend)
class Mytask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        print 'task success 11111'
        return super(Mytask, self).on_success(retval, task_id, args, kwargs)
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print 'task failed'
        return super(Mytask, self).on_failure(exc, task_id, args, kwargs, einfo)
    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        print 'this is after return'
        return super(Mytask, self).after_return(status, retval, task_id, args, kwargs, einfo)
    def on_retry(self, exc, task_id, args, kwargs, einfo):
        print 'this is retry'
        return super(Mytask,self).on_retry(exc, task_id, args, kwargs, einfo)

@app.task(base=Mytask)
def add(x, y):
    time.sleep(2)
    return x / y

celery -A tasks worker --loglevel=info ##启动任务

In [2]: t = add.delay(1, 2) ##执⾏celery函数
在worker中会有类似如下输出:
[2018-10-19 15:05:18,986: WARNING/Worker-1] task success 11111
[2018-10-19 15:05:18,987: WARNING/Worker-1] this is after return
[2018-10-19 15:05:18,988: INFO/MainProcess] Task task.add[c41343cf-e5ca-49f7-9478-63f4c2e2797f] succeeded in 2.01661233298s: 0

In [3]: t = add.delay(1, 0) ##执⾏celery函数, 此函数会由于函数代码中分分母0而报错.
在worker中会有类似如下输出:
[2018-10-19 15:13:38,260: WARNING/Worker-2] task failed
[2018-10-19 15:13:38,261: WARNING/Worker-2] this is after return
[2018-10-19 15:13:38,262: ERROR/MainProcess] Task task.add[12a852cd-726e-44a0-97a8-eb17308c354e] raised unexpected: ZeroDivisionError('integer division or modulo by zero',)
Traceback (most recent call last):
File "/app_shell/websocket/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
R = retval = fun(*args, **kwargs)
File "/app_shell/websocket/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__
return self.run(*args, **kwargs)
File "/app_shell/websocket/demo/task.py", line 38, in add
return x / y
ZeroDivisionError: integer division or modulo by zero

1.2 将修饰函数成为Task类的绑定方法, 执行中的任务获取到了自己执行任务的各种信息, 可以根据这些信息做很多其他操作.
import time
from celery import Celery, platforms
platforms.C_FORCE_ROOT = True
from celery.app.task import Task
broker = 'redis://127.0.0.1:6379'
backend = 'redis://127.0.0.1:6379'
app = Celery(__file__, broker=broker, backend=backend)
@app.task(base=Mytask, bind=True)
def add(self, x, y):
    print self.request
    time.sleep(2)
    return x / y

celery -A tasks worker --loglevel=info ##启动任务

In [2]: t = add.delay(1, 2)
在worker中会有类似如下输出:
2019/7/14 2.Celery进阶使用
127.0.0.1:8888/notebooks/Celery/2.Celery进阶使用.ipynb 2/2
In [ ]:
[2018-10-19 15:05:16,972: WARNING/Worker-1] <Context: {'chord': None, 'retries': 0, 'args': (1, 2), u'is_eager': False, u'correlation_id': u'c41343cf-e5ca-49f7-9478-
63f4c2e2797f', 'errbacks': None, 'taskset': None, 'id': 'c41343cf-e5ca-49f7-9478-63f4c2e2797f', u'headers': {}, 'called_directly': False, 'utc': True, 'task':
'task.add', u'group': None, 'callbacks': None, u'delivery_info': {u'priority': 0, u'redelivered': None, u'routing_key': u'celery', u'exchange': u'celery'},
u'hostname': 'celery@gitlab.example.com', 'expires': None, 'timelimit': (None, None), 'eta': None, 'kwargs': {}, u'reply_to': u'46af07e6-8e03-3574-9608-
fc8c0b10a98e', '_protected': 1}>
[2018-10-19 15:05:18,986: WARNING/Worker-1] task success 11111
[2018-10-19 15:05:18,987: WARNING/Worker-1] this is after return
进阶用法

相关文章:

  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-08-10
  • 2022-12-23
  • 2019-08-07
  • 2018-03-12
  • 2021-10-14
猜你喜欢
  • 2021-10-15
  • 2021-08-10
  • 2022-12-23
  • 2022-03-08
  • 2022-01-19
  • 2021-08-07
  • 2022-01-13
相关资源
相似解决方案