【发布时间】:2012-08-14 00:07:27
【问题描述】:
在 Celery 2 中,我有一个 TaskSet,其设置如下:
for (item,jobId) in itemsAndJobs:
tasks.append(waitForOutput.subtask((jobId,item)))
job = TaskSet(tasks)
result = job.apply_async()
然后,我会通过检查来检查 TaskSet 是否已完成:
job.ready() and job.successful()
这很好——waitForOutput 任务会慢慢移动,然后当它们都完成后,作业检查就会完成。我可以检查多次没有任何问题。
在 celery 3 中,我尝试了一种快速而肮脏的方法,只需更改即可将其更改为组
TaskSet(tasks)
到
group(tasks)
除非我等到所有 waitForOutput 任务完成后才检查是否准备就绪并成功,否则这永远不会奏效。 ready() 总是返回 false。我添加了一些日志记录和 30 秒的默认重试,这就是我所看到的-
- 启动 5 个 waitForOutput 作业
- 检查 ready(),没有 waitForOutput 作业完成,ready 为假
- 2 个 waitForOutput 作业完成
- 检查ready(),ready为false,job.check_completed()为2
- 检查ready(),ready为假,job.check_completed()为0
- 剩余 3 个 waitForOutput 作业已完成
- 检查ready(),ready为false,job.check_completed()为3
如果我使用 Celery 3 代码并从任务中导入 TaskSet 并使用它而不是组,我会看到相同的行为。
我很乐意被告知我只是错误地使用了组!
【问题讨论】:
标签: celery