【问题标题】:Get celery task ids in advanced workflow在高级工作流程中获取 celery 任务 ID
【发布时间】:2014-05-08 21:22:09
【问题描述】:

我需要实现以下场景:

  • 执行任务 A
    • 使用不同的参数并行执行多个任务 B
    • 等待所有任务完成
    • 使用不同的参数并行执行多个任务 B
    • 等待所有任务完成
    • 执行任务 C

我通过实现和弦链实现了这一点,这里是简化的代码:

# 在 ATask 的 run() 方法中 和弦链 = [] 对于 taskB.groups.all() 中的 taskB_group: 任务 = [BTask().si(id=taskB_model.id) for taskB_model in taskB_group.children.all()] 如果 len(任务): chord_chain.append(chord(tasks, _dummy_callback.s())) chord_chain.append(CTask().si(execution_id)) 链(和弦链)()

问题是我需要能够在任何时间点对所有 BTask 调用 revoke(terminate=True)。较低级别的问题是我无法访问 BTask celery ids。

  1. 尝试通过链 result = chain(chord_chain)() 获取 BTask id。但我没有在返回的 AsyncResult 对象中找到该信息。是否可以从此对象获取链子 ID? (result.children 为无)
  2. 尝试通过 ATask AsyncResult 获取 BTask id,但似乎 children 属性只包含 first 和弦的结果,而不包含其余任务的结果。
>>> r=AsyncResult(#ATask.id#) >>> r.children [, ]

【问题讨论】:

    标签: python celery


    【解决方案1】:

    通过使用中止状态标志标记 ATask 相关模型并在 BTask 开始时添加检查来解决。

    【讨论】:

      猜你喜欢
      • 2011-03-19
      • 1970-01-01
      • 1970-01-01
      • 2013-12-09
      • 2017-03-25
      • 2012-10-01
      • 2021-04-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多