【问题标题】:Task chaining with shift using Celery使用 Celery 进行轮班的任务链
【发布时间】:2021-02-22 16:38:01
【问题描述】:

考虑以下两步算法:

Iteration 1: A1 A2 A3
Iteration 2:    B2 B3

其中任务B{i} 依赖于任务A{i-1}A{i}

问题是如何用 Celery 执行这个工作流,这样

  • 没有A 任务被执行两次。
  • 每个B 任务都可以在它所依赖的两个A 任务完成后立即启动?

我尝试了以下选项,但没有一个具有这两个属性。

选项 1:两组

我已经像这样将两个迭代分成两个单独的组。

result = group([A.s(1), A.s(2), A.s(3)]) | group([B.s(2), B.s(3)])

这个执行的问题是,在B 组可以启动之前,需要完成具有A 任务的整个组。这会导致预期的结果,但不会导致资源的最佳利用。

选项 2:和弦

result = group([
    chord([A.s(1), A.s(2)], B.s(2)),
    chord([A.s(2), A.s(3)], B.s(3))
])

这里的问题是A.s(2) 被调用了两次。我可以在我的应用程序内部管理它,但这需要某种分布式锁和更仔细地处理已经完成的事情和需要完成的事情。执行A.s(2) 两次不是一种选择。任务是幂等的,但执行时间太长。

【问题讨论】:

  • 我相信你可以通过巧妙地使用task links来完成你所需要的。
  • @DejanLekic 你能详细说明一下吗?
  • 对于这类事情,Dask 是比 Celery 更好的工具。你和 Celery 有关系吗?
  • @DHudson 一点也不。我会调查一下。我仍然希望看到答案,因为切换到另一种解决方案会产生成本。
  • @tomas789 试试这个 Dask 解决方案 gist.github.com/DomHudson/7a4d2b2f11186c880be1901b3954cf39。我不知道在 Celery 中执行此操作的最佳方法,所以我没有发布这个问题的答案。

标签: python celery celery-task


【解决方案1】:

你可以尝试和弦本身类似的东西:

@shared_task(name='notify_complete')
def notify_complete(*args, extra_id):
    # get list of result from a1,a2,a3 from args
    # execute b1, b2
    # extra_id: just in case you want to pass any var. you can ignore it.
    pass


chord([A.s(1), A.s(2), A.s(3)], notify_complete.s(extra_id=extra_id))

【讨论】:

  • 函数notify_complete只有在所有A{i}任务完成后才会被调用,对吧?这将使其等同于我在问题中提到的 Option 1 。我的意图不同。
猜你喜欢
  • 2013-11-14
  • 2014-11-03
  • 1970-01-01
  • 2016-03-31
  • 2017-03-27
  • 1970-01-01
  • 1970-01-01
  • 2013-01-07
  • 2017-05-01
相关资源
最近更新 更多