【问题标题】:Run a group task from previous task list result从先前的任务列表结果运行组任务
【发布时间】:2020-06-29 16:18:46
【问题描述】:

我一直在尝试使用 celery 执行管道。初始任务应创建要处理的项目列表,我将使用 group 进一步并行化每个项目处理。最后我应该从小组任务中收集结果。

@app.task()
def prepare():
    return [item1, item2, item3]

@app.task()
def parallel_process(items, additional_param):
    return group(process.s(i, additional_param) for i in items)() # I get an error kombu.exceptions.EncodeError: Object of type GroupResult is not JSON serializable

@app.task()
def process(i, param):
    return mapping_func(item, param)

@app.task()
def collect(results):
    print(results)

pipeline = prepare.s() | parallel_process.s(param) | collect.s()
pipeline.apply_async()

我得到一个错误kombu.exceptions.EncodeError: Object of type GroupResult is not JSON serializable

处理任务被调用,但收集任务没有。最终的结果永远不会到来。有没有其他方法可以做到这一点?无法在线找到合适的示例。

【问题讨论】:

    标签: celery


    【解决方案1】:

    根据您的错误信息,您应该将CELERY_RESULT_SERIALIZER 更改为pickle,因为GroupResult 类型不是JSON 可序列化的。

    参考:https://docs.celeryproject.org/en/stable/userguide/configuration.html#result-serializer

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-04-07
    • 2013-02-11
    • 1970-01-01
    • 2018-04-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多