【发布时间】:2020-06-29 16:18:46
【问题描述】:
我一直在尝试使用 celery 执行管道。初始任务应创建要处理的项目列表,我将使用 group 进一步并行化每个项目处理。最后我应该从小组任务中收集结果。
@app.task()
def prepare():
return [item1, item2, item3]
@app.task()
def parallel_process(items, additional_param):
return group(process.s(i, additional_param) for i in items)() # I get an error kombu.exceptions.EncodeError: Object of type GroupResult is not JSON serializable
@app.task()
def process(i, param):
return mapping_func(item, param)
@app.task()
def collect(results):
print(results)
pipeline = prepare.s() | parallel_process.s(param) | collect.s()
pipeline.apply_async()
我得到一个错误kombu.exceptions.EncodeError: Object of type GroupResult is not JSON serializable
处理任务被调用,但收集任务没有。最终的结果永远不会到来。有没有其他方法可以做到这一点?无法在线找到合适的示例。
【问题讨论】:
标签: celery