【问题标题】:How to call a task at the end of the execution of each chunk in celery?如何在芹菜中每个块的执行结束时调用任务?
【发布时间】:2013-06-17 08:37:31
【问题描述】:

我正在尝试在完成大量任务时更新一些聚合数据。它基本上是在块的末尾实现和弦的概念。我该如何实现?

【问题讨论】:

    标签: celery celery-task djcelery


    【解决方案1】:

    好的,我终于这样解决了。

    我首先将传入的 arg_collection 分解为块大小的子集合。然后,我们为它们中的每一个应用一个和弦。

    
    
        # required util
        def n_sized_parts(inp_iterable, n):
             return (inp_iterable[i:i+n] for i in xrange(0,len(inp_iterable),n))
    
        # defintion of tasks
        @task
        def gen_task(arg1, arg2):
            return arg1 + arg2
    
        @task
        def any_callback_task_for_end_of_chunk(some_arg):
            return some_arg * 10
            # this is a trivial example. 
            # usually, you will want to do something better.
    
        @task
        def apply_chord_to_chunk(arg_collection, *args, **kwargs):
            task_list = []
            callback_arg = kwargs.pop('callback_arg')
    
            for part in n_sized_parts(arg_collection, size_of_each_chunk):
                 task_list.append(chord(gen_task.starmap(part))(any_callback_task_for_end_of_chunk.si(callback_arg))))
    
             return task_list
    
        # invocation
        apply_chord_to_chunk.s(zip(xrange(100),xrange(100)), callback_arg='asdgag')
    
    

    如果您在此解决方案的任何地方发现问题,请告诉我。我会快速更新任何需要的编辑。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-01-22
      • 1970-01-01
      • 2021-12-16
      • 2018-03-07
      • 2014-11-29
      • 1970-01-01
      • 2022-01-21
      • 2020-10-07
      相关资源
      最近更新 更多