【发布时间】:2015-09-10 17:10:29
【问题描述】:
我正在尝试解决 celery 中的一个问题:
- 我有一个任务会查询 API 以获取 id,然后为每个任务启动一个子任务。
- 我提前不知道 ID 是什么,或者有多少。
- 对于每个 id,我都会进行一次大计算,然后将一些数据转储到数据库中。
- 所有子任务完成后,我想运行一个汇总函数(将数据库结果导出为 Excel 格式)。
- 理想情况下,我不想阻止我的主要工作人员查询子任务的状态(如果你这样做,Celery 会生气。)
这个问题看起来非常相似(如果不相同?):Celery: Callback after task hierarchy
所以使用“解决方案”(这是this discussion的链接,我尝试了以下测试脚本:
# test.py
from celery import Celery, chord
from celery.utils.log import get_task_logger
app = Celery('test', backend='redis://localhost:45000/10?new_join=1', broker='redis://localhost:45000/11')
app.conf.CELERY_ALWAYS_EAGER = False
logger = get_task_logger(__name__)
@app.task(bind=True)
def get_one(self):
print('hello world')
self.replace(get_two.s())
return 1
@app.task
def get_two():
print('Returning two')
return 2
@app.task
def sum_all(data):
print('Logging data')
logger.error(data)
return sum(data)
if __name__ == '__main__':
print('Running test')
x = chord(get_one.s() for i in range(3))
body = sum_all.s()
result = x(body)
print(result.get())
print('Finished w/ test')
这对我不起作用。我收到一个错误:
AttributeError: 'get_one' 对象没有属性 'replace'
请注意,我确实在我的后端 URL 中有 new_join=1,但不是代理。如果我把它放在那里,我会得到一个错误:
TypeError: _init_params() 得到了一个意外的关键字参数“new_join”
我做错了什么?我正在使用 Python 3.4.3 和以下软件包:
amqp==1.4.6
anyjson==0.3.3
billiard==3.3.0.20
celery==3.1.18
kombu==3.0.26
pytz==2015.4
redis==2.10.3
【问题讨论】: