【问题标题】:Celery: clean way of revoking the entire chain from within a taskCelery:从任务中撤销整个链的干净方式
【发布时间】:2014-07-10 17:30:41
【问题描述】:

我的问题可能很基本,但我仍然无法在官方文档中找到解决方案。我在我的 Django 应用程序中定义了一个 Celery 链,执行一组依赖于其他的任务:

chain(  tasks.apply_fetching_decision.s(x, y),
        tasks.retrieve_public_info.s(z, x, y),
        tasks.public_adapter.s())()

显然第二个和第三个任务需要父级的输出,这就是我使用链的原因。

现在的问题是:如果第一个任务中的测试条件失败,我需要以编程方式撤销第二个和第三个任务。如何以干净的方式做到这一点?我知道我可以从定义链的方法中撤销链的任务(请参阅thisquestion 和 this doc),但 内部 第一个任务我看不到后续任务也不是链条本身。

临时解决方案

我目前的解决方案是跳过根据前一个任务的结果在后续任务中的计算:

@shared_task
def retrieve_public_info(result, x, y):
   if not result:
      return []
   ...

@shared_task
def public_adapter(result, z, x, y):
   for r in result:
       ...

但是这种“解决方法”有一些缺陷:

  • 为每个任务添加不必要的逻辑(基于前任的结果),从而影响重用
  • 仍然执行后续任务,以及所有由此产生的开销

我没有过多地将链的引用传递给任务,因为我担心会搞砸事情。我也承认我还没有尝试过抛出异常的方法,因为我认为不通过链继续进行的选择可能是一种功能性(因此非异常)场景......

感谢您的帮助!

【问题讨论】:

标签: django task celery chain


【解决方案1】:

从 Celery 4.0 开始,我发现正在工作的是使用以下语句从当前 task instance's request 中删除剩余任务:

self.request.chain = None

假设您有一个任务链a.s() | b.s() | c.s()。如果您 bind the task 通过将 bind=True 作为参数传递给任务的装饰器,则只能访问任务内的 self 变量。

@app.task(name='main.a', bind=True):
def a(self):
  if something_happened:
    self.request.chain = None

如果something_happened 为真,bc 将不会被执行。

【讨论】:

  • 应该如何使用?例如:我有一个ID,由chain_task.apply_async()返回,我该怎么做才能撤销这个任务?
  • @ailin 刚刚编辑了答案。我不知道如何通过 id 撤销任务。如果您需要在某些任务中有条件地停止链,则此解决方案很有用。
  • 哦,我明白了,谢谢。顺便说一句,我发现这种方法并不优雅,但对我有帮助:stackoverflow.com/a/23908345/6696051
  • 我想知道这个解决方案有多“官方”;对我来说有点hacky。虽然它似乎有效,但我找不到文档证明这是中止一系列任务的受支持方式......
  • 如果我在一个组中有这个链并且我设置了self.request.chain = None,我只想停止这个链,而不影响并行组的其余部分。该语句是否正确只会影响当前链?
【解决方案2】:

我想我找到了这个问题的答案:this 确实是正确的方法。不过,我想知道为什么在任何地方都没有记录这种常见的情况。

为了完整起见,我发布了基本代码快照:

@app.task(bind=True)  # Note that we need bind=True for self to work
def task1(self, other_args):
    #do_stuff
    if end_chain:
        self.request.callbacks[:] = []
    ....

更新

我实现了一种更优雅的方式来处理这个问题,我想与你分享。我正在使用一个名为revoke_chain_authority 的装饰器,这样它就可以自动撤销链,而无需重写我之前描述的代码。

from functools import wraps

class RevokeChainRequested(Exception):
    def __init__(self, return_value):
        Exception.__init__(self, "")

        # Now for your custom code...
        self.return_value = return_value


def revoke_chain_authority(a_shared_task):
    """
    @see: https://gist.github.com/bloudermilk/2173940
    @param a_shared_task: a @shared_task(bind=True) celery function.
    @return:
    """
    @wraps(a_shared_task)
    def inner(self, *args, **kwargs):
        try:
            return a_shared_task(self, *args, **kwargs)
        except RevokeChainRequested, e:
            # Drop subsequent tasks in chain (if not EAGER mode)
            if self.request.callbacks:
                self.request.callbacks[:] = []
            return e.return_value

    return inner

这个装饰器可以用在 shared task 上,如下所示:

@shared_task(bind=True)
@revoke_chain_authority
def apply_fetching_decision(self, latitude, longitude):
    #...

    if condition:
        raise RevokeChainRequested(False)

注意使用@wraps。必须保留原始函数的签名,否则后者会丢失,celery 会在调用正确的包装任务时弄得一团糟(例如,它总是会调用第一个注册的函数而不是正确的函数)

【讨论】:

    猜你喜欢
    • 2018-02-11
    • 1970-01-01
    • 2023-03-25
    • 1970-01-01
    • 2015-05-06
    • 2016-10-21
    • 1970-01-01
    • 2017-03-27
    • 2019-07-03
    相关资源
    最近更新 更多