【问题标题】:Celery long running task state goes back to pending despite task_track_started=True尽管 task_track_started=True
【发布时间】:2021-05-05 16:04:05
【问题描述】:

有很多类似的问题,但我无法弄清楚我的情况发生了什么。

我正在使用 celery 对必须按顺序执行的长时间运行的任务进行排队。

我以这种方式启动任务:

task = mytask.apply_async(
   args=[myargs],
   task_id=my_custom_task_id,
   queue="gpu_queue",
)

我以这种方式启动工人:

celery -A celery_app worker -Q gpu_queue --loglevel=INFO --concurrency=1 -E

mytask里面我经常打电话给current_task.update_state(state="MYCUSTOMSTATE", meta={'customkey': 'customvalue'})

我尝试通过以下方式监控任务状态:

task = celery_app.AsyncResult(task_id)
task.status
task.result  # contains the the meta dict

一切正常,除了队列中有多个任务时,正在运行的任务随机(?)报告PENDING作为它的状态和一个空的元字典,即使我使用@987654327 @celeryconfig.py。这非常令人沮丧,因为除了虚假报告之外,我对这一切的运作方式感到非常满意。有解决办法吗?芹菜是不是适合这项工作的工具?

我能想到的唯一解决方法是让我的监控应用程序在收到任务已经开始的信息时不切换显示 PENDING 状态,但这感觉很hacky。

MWE

app.py

​​>
import time

from celery import Celery, current_task

app = Celery('tasks', backend='rpc://', broker='pyamqp://guest@localhost//')

@app.task
def add(x, y):
    current_task.update_state(state="Running")
    for i in range(5):
        time.sleep(1)
    return x + y

watch.py​​

​​>
import time

from app import app, add

task = add.apply_async(
    args=[1, 2],
    task_id="task1",
)

print("task1 alone")

for _ in range(5):
    print("task1", app.AsyncResult("task1").status)
    time.sleep(0.5)

task = add.apply_async(
    args=[3, 4],
    task_id="task2",
)

print("task2 was launched")

while not app.AsyncResult("task2").ready():
    print("task1", app.AsyncResult("task1").status)
    print("task2", app.AsyncResult("task2").status)
    time.sleep(0.5)

celeryconfig.py

​​>
task_track_started=True

docker run -p 5672:5672 rabbitmq

celery --config=celeryconfig -A app worker --loglevel=INFO --concurrency=1 -E

python watch.py

输出:

task1 alone
task1 PENDING
task1 Running
task1 Running
task1 Running
task1 Running
task2 was launched
task1 Running
task2 PENDING
task1 PENDING
task2 PENDING
task1 PENDING
task2 PENDING
task1 PENDING
task2 PENDING
task1 PENDING
task2 PENDING
task1 SUCCESS
task2 Running
task1 PENDING
task2 Running
task1 PENDING
task2 PENDING
task1 PENDING
task2 PENDING
task1 PENDING
task2 PENDING
task1 PENDING
task2 PENDING
task1 PENDING
task2 PENDING
task1 PENDING
task2 PENDING
task1 PENDING
task2 PENDING
task1 PENDING
task2 PENDING

【问题讨论】:

  • 你是说它报告 STARTED 状态,然后是 PENDING?如果是这种情况,那么我会说这是一个应该报告的错误,或者他们应该更新文档,说 Celery 可能会在 STARTED 之后报告 PENDING 状态......在你的情况下,我希望 MYCUSTOMSTATE报告而不是 PENDING。
  • 我什至看不到 STARTED 状态,因为我每隔 1 秒轮询一次以显示状态,所以我只看到 MYCUSTOMSTATE,然后在我启动第二个任务时再次看到 PENDING。偶尔,我会再次看到 MYCUSTOMSTATE。但是任务肯定正在处理,如果我只是等待它完成然后第二个开始。所以你说这不应该发生。我必须在 MWE 上工作以重现该错误报告。谢谢你的回答。

标签: python celery


【解决方案1】:

通过使用 redis 作为 celery 后端并将result_persistent=True 添加到 conf 中解决了我的问题。我不确定我遇到的是 celery 的错误还是 RPC 后端的预期行为。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2012-04-23
    • 2015-07-09
    • 1970-01-01
    • 2017-03-07
    • 2017-12-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多