【问题标题】:How can I asynchronously receive processed messages in Celery?如何在 Celery 中异步接收已处理的消息?
【发布时间】:2018-09-28 17:04:19
【问题描述】:

我正在使用 Celery 编写数据处理管道,因为这会大大加快速度。

考虑以下伪代码:

从 celery.result 导入结果集 from some_celery_app import processing_task # of type @app.task def crunch_data(): 结果 = 结果集([]) 对于 mongo.find() 中的文档:#Around 100K - 1M 个文档 作业 = processing_task.delay(文档) 结果.添加(工作) 返回结果.get() 收集数据 = 紧缩数据() #用收集到的数据做一些事情

我成功地生成了四个启用并发的工作人员,当我运行这个脚本时,数据会得到相应的处理,我可以做任何我想做的事情。

我使用 RabbitMQ 作为消息代理,rpc 作为后端。

我打开 RabbitMQ 管理 UI 时看到的内容:

  1. 首先处理所有文件
  2. 然后,只有这样,集体results.get() 调用才能检索到文档。

我的问题:有没有办法同时进行处理和后续检索?就我而言,由于所有文档都是互不依赖的原子实体,因此似乎无需等待作业完全处理完毕。

【问题讨论】:

    标签: python asynchronous concurrency rabbitmq celery


    【解决方案1】:

    您可以尝试ResultSet.get(callback=cbResult)中的回调参数,然后您可以在回调中处理结果。

    def cbResult(task_id, value):
      print(value)
    results.get(callback=cbResult)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-08-19
      • 1970-01-01
      • 2021-09-24
      • 2017-02-12
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多