【问题标题】:Celery: Callback after task hierarchy芹菜:任务层次结构后的回调
【发布时间】:2013-05-20 04:22:36
【问题描述】:

我正在使用 webapp 中的 Celery 来启动任务层次结构。

任务

我正在使用以下任务:

  • task_a
  • task_b
  • task_c
  • notify_user

一个 Django 视图启动多个 task_a 实例。他们每个人都进行一些处理,然后启动几个task_b 实例。每一个都进行一些处理,然后启动几个 task_c 实例。

可视化:

目标

我的目标是执行所有任务,并在整个层次结构完成后立即运行回调函数。此外,我希望能够将数据从最低任务传递到顶层。

  1. 视图应该只是“启动”任务然后返回。
  2. 每个子任务都依赖于父任务。父任务不直接依赖于子任务。父任务启动所有子任务后,即可停止。
  3. 只要父任务在子任务启动之前运行,一切都可以并行化。
  4. 所有任务完成后,应调用notify_user回调函数。
  5. notify_user 回调函数需要访问来自task_cs 的数据。

所有任务都应该是非阻塞的,因此task_b 不应等待所有task_c 子任务完成。

实现上述目标的正确方法是什么?

【问题讨论】:

    标签: python concurrency celery


    【解决方案1】:

    解决方案原来是这个拉取请求中提供的动态任务功能:https://github.com/celery/celery/pull/817。这样,每个任务可以返回一组子任务,然后替换队列中的原始任务。

    【讨论】:

    • Danilo 引用的动态任务功能非常强大。它没有并入当前 (3.0.21) 版本的 Celery,但可以轻松添加到本地 Celery 部署中。
    【解决方案2】:

    假设您有以下任务:

    celery = Celery(
        broker="amqp://test:test@localhost:5672/test"
    )
    celery.conf.update(
        CELERY_RESULT_BACKEND = "mongodb",
    )
    
    
    @celery.task
    def task_a(result):
        print 'task_a:', result
        return result
    
    @celery.task
    def task_b(result):
        print 'task_b:', result
        return result
    
    @celery.task
    def task_c(result):
        print 'task_c:', result
        return result
    
    @celery.task
    def notify_user(result):
        print result
        return result
    

    对于给定的输入数据(如您绘制的那样):

        tree = [
            [["C1", "C2", "C3"], ["C4", "C5"]], [["C6", "C7", "C8"], ["C9"]]
        ]
    

    你可以这样做:

        a_group = []
        for ia, a in enumerate(tree):
            print "A%s:" % ia
            b_group = []
            for ib, b in enumerate(a):
                print " - B%s:" % ib
                for c in b:
                    print '   -', c
    
                c_group = group([task_c.s(c) for c in b])
    
                b_group.append(c_group | task_b.s())
    
            a_group.append(group(b_group) | task_a.s())
    
        final_task = group(a_group) | notify_user.s()
    

    它的表示是(不要读它,它很丑:)

    [[[__main__.task_c('C1'), __main__.task_c('C2'), __main__.task_c('C3')] | __main__.task_b(), [__main__.task_c('C4'), __main__.task_c('C5')] | __main__.task_b()] | __main__.task_a(), [[__main__.task_c('C6'), __main__.task_c('C7'), __main__.task_c('C8')] | __main__.task_b(), [__main__.task_c('C9')] | __main__.task_b()] | __main__.task_a()] | __main__.notify_user()
    

    传递给 notify_user 的数据是:

    [[['C1', 'C2', 'C3'], ['C4', 'C5']], [['C6', 'C7', 'C8'], ['C9']]]
    

    一切都通过回调(和弦)运行,因此没有任务等待其他任务。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-05-14
      • 1970-01-01
      • 1970-01-01
      • 2018-03-11
      • 1970-01-01
      • 2015-01-21
      相关资源
      最近更新 更多