【问题标题】:Use .replace method with Celery sub-tasks对 Celery 子任务使用 .replace 方法
【发布时间】:2015-09-10 17:10:29
【问题描述】:

我正在尝试解决 celery 中的一个问题:

  • 我有一个任务会查询 API 以获取 id,然后为每个任务启动一个子任务。
  • 我提前不知道 ID 是什么,或者有多少。
  • 对于每个 id,我都会进行一次大计算,然后将一些数据转储到数据库中。
  • 所有子任务完成后,我想运行一个汇总函数(将数据库结果导出为 Excel 格式)。
  • 理想情况下,我不想阻止我的主要工作人员查询子任务的状态(如果你这样做,Celery 会生气。)

这个问题看起来非常相似(如果不相同?):Celery: Callback after task hierarchy

所以使用“解决方案”(这是this discussion的链接,我尝试了以下测试脚本:

# test.py
from celery import Celery, chord
from celery.utils.log import get_task_logger

app = Celery('test', backend='redis://localhost:45000/10?new_join=1', broker='redis://localhost:45000/11')
app.conf.CELERY_ALWAYS_EAGER = False

logger = get_task_logger(__name__)


@app.task(bind=True)
def get_one(self):
    print('hello world')
    self.replace(get_two.s())
    return 1


@app.task
def get_two():
    print('Returning two')
    return 2


@app.task
def sum_all(data):
    print('Logging data')
    logger.error(data)
    return sum(data)


if __name__ == '__main__':
    print('Running test')
    x = chord(get_one.s() for i in range(3))
    body = sum_all.s()
    result = x(body)

    print(result.get())
    print('Finished w/ test')

这对我不起作用。我收到一个错误:

AttributeError: 'get_one' 对象没有属性 'replace'

请注意,我确实在我的后端 URL 中有 new_join=1,但不是代理。如果我把它放在那里,我会得到一个错误:

TypeError: _init_params() 得到了一个意外的关键字参数“new_join”

我做错了什么?我正在使用 Python 3.4.3 和以下软件包:

amqp==1.4.6
anyjson==0.3.3
billiard==3.3.0.20
celery==3.1.18
kombu==3.0.26
pytz==2015.4
redis==2.10.3

【问题讨论】:

    标签: python celery


    【解决方案1】:

    Task.replace 方法将在 Celery 3.2 中添加:http://celery.readthedocs.org/en/master/whatsnew-3.2.html#task-replace(该更改日志条目具有误导性,因为它表明 Task.replace 之前存在并且已被更改。)

    【讨论】:

    • 好的。谢谢你。是的,我已经阅读了变更日志,所以认为它在 3.1 中。我还通过源进行了搜索,并认为它在那里。
    猜你喜欢
    • 2012-03-04
    • 2014-11-03
    • 1970-01-01
    • 2021-06-07
    • 2011-06-19
    • 2013-08-30
    • 2017-06-27
    • 2021-12-21
    • 2017-02-10
    相关资源
    最近更新 更多