【发布时间】:2018-09-28 17:04:19
【问题描述】:
我正在使用 Celery 编写数据处理管道,因为这会大大加快速度。
考虑以下伪代码:
从 celery.result 导入结果集 from some_celery_app import processing_task # of type @app.task def crunch_data(): 结果 = 结果集([]) 对于 mongo.find() 中的文档:#Around 100K - 1M 个文档 作业 = processing_task.delay(文档) 结果.添加(工作) 返回结果.get() 收集数据 = 紧缩数据() #用收集到的数据做一些事情我成功地生成了四个启用并发的工作人员,当我运行这个脚本时,数据会得到相应的处理,我可以做任何我想做的事情。
我使用 RabbitMQ 作为消息代理,rpc 作为后端。
我打开 RabbitMQ 管理 UI 时看到的内容:
- 首先处理所有文件
- 然后,只有这样,集体
results.get()调用才能检索到文档。
我的问题:有没有办法同时进行处理和后续检索?就我而言,由于所有文档都是互不依赖的原子实体,因此似乎无需等待作业完全处理完毕。
【问题讨论】:
标签: python asynchronous concurrency rabbitmq celery