【问题标题】:Celery chain not working with batches芹菜链不适用于批次
【发布时间】:2015-09-30 09:24:50
【问题描述】:

乍一看,我非常喜欢 Celery 中的“批处理”功能,因为我需要在调用 API 之前对一定数量的 ID 进行分组(否则我可能会被踢出局)。

不幸的是,在进行一点测试时,批处理任务似乎无法与 Canvas 的其他原语(在本例中为链)配合使用。例如:

@a.task(base=Batches, flush_every=10, flush_interval=5)
def get_price(requests):
    for request in requests:
        a.backend.mark_as_done(request.id, 42, request=request)
        print "filter_by_price " + str([r.args[0] for r in requests])

@a.task
def completed():
    print("complete")

所以,通过这个简单的工作流程:

chain(get_price.s("ID_1"), completed.si()).delay()

我看到了这个输出:

[2015-07-11 16:16:20,348: INFO/MainProcess] Connected to redis://localhost:6379/0
[2015-07-11 16:16:20,376: INFO/MainProcess] mingle: searching for neighbors
[2015-07-11 16:16:21,406: INFO/MainProcess] mingle: all alone
[2015-07-11 16:16:21,449: WARNING/MainProcess] celery@ultra ready.
[2015-07-11 16:16:34,093: WARNING/Worker-4] filter_by_price ['ID_1']

5 秒后,filter_by_price() 就像预期的那样被触发。问题是 completed() 永远不会被调用。

对这里可能发生的事情有任何想法吗? 如果不使用批处理,有什么好的方法可以解决这个问题?

PS:我已经像文档说的那样设置了CELERYD_PREFETCH_MULTIPLIER=0

【问题讨论】:

  • 只是为了记录,我非常需要批处理的东西,我最终只使用 RabbitMQ + Pika 和一个非常简单的工作模板来缓冲消息。如果有人有兴趣,我有可用的源代码,干杯。

标签: python rabbitmq celery celery-task


【解决方案1】:

看起来批处理任务的行为与普通任务有很大不同。批处理任务甚至没有发出像task_success 这样的信号。

由于需要在get_price之后调用completed任务,所以可以直接从get_price本身调用。

@a.task(base=Batches, flush_every=10, flush_interval=5)
def get_price(requests):
    for request in requests:
         # do something
    completed.delay()

【讨论】:

    猜你喜欢
    • 2013-10-30
    • 1970-01-01
    • 2014-02-07
    • 2011-10-05
    • 1970-01-01
    • 1970-01-01
    • 2019-02-12
    • 2018-12-07
    • 2012-12-05
    相关资源
    最近更新 更多