【问题标题】:Callback for celery apply_async芹菜 apply_async 的回调
【发布时间】:2012-09-21 08:09:48
【问题描述】:

我在我的应用程序中使用celery 来运行定期任务。让我们看下面的简单示例

from myqueue import Queue
@perodic_task(run_every=timedelta(minutes=1))
def process_queue():
    queue = Queue()
    uid, questions = queue.pop()
    if uid is None:
        return

    job = group(do_stuff(q) for q in questions)
    job.apply_async()

def do_stuff(question):
    try:
        ...
    except:
        ...
        raise

正如您在上面的示例中看到的,我使用celery 来运行异步任务,但是(因为它是一个队列)我需要在do_stuffqueue.ack(uid) 出现异常的情况下执行queue.fail(uid) 否则。在这种情况下,在这两种情况下从我的任务中获得一些回调是非常清楚和有用的 - on_failureon_success

我看到了一些 documentation,但从未见过将回调与 apply_async 一起使用的做法。有可能吗?

【问题讨论】:

    标签: python celery


    【解决方案1】:

    继承Task类并重载on_success和on_failure函数:

    class CallbackTask(Task):
        def on_success(self, retval, task_id, args, kwargs):
            pass
    
        def on_failure(self, exc, task_id, args, kwargs, einfo):
            pass
    
    
    @celery.task(base=CallbackTask)  # this does the trick
    def add(x, y):
        return x + y
    

    【讨论】:

    • 如何像在 args 和 kwargs 中一样将参数传递给 on_success 或 on_failure?
    • 这不会在任务调度端回调。
    • 那么一旦celery worker成功完成,它会调用on_success方法吗?
    【解决方案2】:

    您可以在调用 apply_async 时通过 link 和 link_err kwargs 指定成功和错误回调。 celery 文档包含一个明确的示例:http://docs.celeryproject.org/en/latest/userguide/calling.html#linking-callbacks-errbacks

    【讨论】:

      猜你喜欢
      • 2022-06-17
      • 2016-05-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-11-10
      • 1970-01-01
      • 2016-10-20
      相关资源
      最近更新 更多