【发布时间】:2015-04-29 10:51:53
【问题描述】:
我有三个任务:
@app.task(name='timey')
def timey():
print "timey"
while True:
pass
return 1
@app.task(name='endtimey')
def endtimey():
for i in range(10):
print "ENDTIMEY", time()
sleep(3)
return 1
@app.task(name='nexttask')
def nexttask(n):
print "NEXT TASK"
return 1
如果我唯一要做的就是将 endtimey 和 nexttask 链接在一起 -
chain(endtimey.s() | nexttask.s()).delay()
一切都按预期进行。我在芹菜日志中看到ENDTIMEY <current time> 打印十次,然后是NEXT TASK。但是,如果我用无限任务 timey 填满 7 个工人,然后将 endtimey 和 nexttask 链接在一起 -
for i in range(7):
timey.s().delay()
chain(endtimey.s() | nexttask.s()).delay()
所有timey的任务将被8个worker中的7个接走,endtimey会在第8个worker上运行,之后日志会显示nexttask已经收到,但是@987654332 @ 不会运行。
这是为什么?
另外,如果我杀死 celery 服务器然后重新启动它,nexttask 将首先运行。
这是一个人为的例子,但我在一个更复杂的情况下遇到了一个问题,即 celery 工作人员在完成当前任务后没有拿起排队的任务。如果我在这种情况下重新启动 celery,空闲的工作人员将再次开始接任务。
【问题讨论】:
-
这听起来像是一个错误,你有没有用 celery 提交过错误,或者你知道提交过的错误吗?
-
我不是 100% 确定这是一个错误。根据 Chris Ward 的回答,问题在于工作人员提前保留任务,因此如果运行无限任务的工作人员之一保留
nexttask.s(),那么它永远不会被拾取。这似乎是一个有意识的设计决定。
标签: python django rabbitmq celery