【发布时间】:2017-09-12 09:44:23
【问题描述】:
当我使用 celery group 和 chain 来安排如下任务时
(group([group_task]) | sum_task).apply_async()
组任务可以在多个worker中执行,在所有组任务完成后,sum_task开始执行(可能在另一个worker中),所以 谁能告诉我 celery 是怎么知道组任务全部完成然后开始 sum_task 的?
【问题讨论】:
当我使用 celery group 和 chain 来安排如下任务时
(group([group_task]) | sum_task).apply_async()
组任务可以在多个worker中执行,在所有组任务完成后,sum_task开始执行(可能在另一个worker中),所以 谁能告诉我 celery 是怎么知道组任务全部完成然后开始 sum_task 的?
【问题讨论】:
您可以为每个链式任务和组/和弦回调任务指定不同的队列。
片段喜欢:
@shared_task(name="analyze_atom", queue="atom")
def analyze_atom(image_urls, targetdir=target_path, studentuid=None):
return {}
@shared_task(name="summary_up", queue="summary")
def summary_up(rets, studentuid, images):
return {}
chord(analyze_atom.s([image]) for image in images)(summary_up.s(studentuid, images))
并且,当任务运行时,您可以检查代理内容,假设您使用rabbitmq作为代理,您可以通过rabbitmq管理插件或pyrabbit接口sn-p在这里检查队列深度:
from pyrabbit.api import Client
cl = Client('localhost:15672', 'guest', 'guest')
count = cl.get_queue_depth('/', 'summary') # this guy check queue depth
cl.get_messages('/','paperanalyzer') # this guy get messages within queue
而且,您应该有结果后端,您可以通过任务 ID 获取每个任务结果。
我认为根据上述技能,很容易检查芹菜任务的进展情况。
祝你好运:-)
【讨论】: