【发布时间】:2012-06-01 19:40:32
【问题描述】:
问题
我使用 celery 启动如下所示的任务集:
- 我执行了一批可以并行运行的任务,这批任务的数量从几十到几千不等。
-
我将这些任务的结果汇总为单个答案,然后对这个答案做一些事情——比如存储到数据库、保存到特殊的结果文件等等。基本上在执行完任务后,我必须调用具有以下签名的函数:
def callback(result_file_name, task_result_list): #store in file def callback(entity_key, task_result_list): #store in db
现在第 1 步在 Celery 队列中完成,第 2 步在 celery 外部完成:
tasks = []
# add taksks to tasks list
task_group = group()
task_group.tasks = tasks
result = task_group.apply_async()
res = result.join()
# Aggregate results
# Save results to file, database whatever
这种方法很麻烦,因为我必须停止单个线程,直到所有任务都执行完毕(这可能需要几个小时)。
我也想以某种方式将第 2 步移到 celery --- 基本上我需要向整个任务集添加一个回调(据我所知它在 Celery 中不受支持)或提交一个在所有这些之后执行的任务子任务。
有人知道怎么做吗?我在 django 环境中使用它,所以我可以在数据库中存储一些状态。
总结一下我最近的发现
和弦不行
我不能直接使用和弦,因为和弦使我能够创建这样的回调:
def callback(task_result_list):
#store in file
没有明显的方法可以将附加参数传递给回调(特别是因为这些回调不能是本地函数)。
也使用数据库
我可以使用TaskSetMeta 来存储结果,但是这个实体没有状态字段 --- 所以即使我要向 TaskSetMeta 添加一个信号,我也必须汇集任务结果,这可能会产生巨大的开销。
【问题讨论】:
-
你试过chords吗?
-
我研究了和弦,我认为他们不会这样做。我想将结果存储在数据库或文件或其他任何东西中。在这种情况下,chord 需要传递两个参数,首先是报告文件名,或者任务实体第二个参数的一些细节将被连接 args。截至今天,芹菜和弦仅列出结果。还是我错了?