【问题标题】:flask getting celery AsyncResult task via id returns incorrect taskflask 通过 id 获取 celery AsyncResult 任务返回不正确的任务
【发布时间】:2023-02-23 08:39:19
【问题描述】:

根据本教程:https://blog.miguelgrinberg.com/post/using-celery-with-flask,我正在尝试使用烧瓶运行芹菜,然后显示结果。但是在任务成功完成后,flask 应用程序仍然看到一个“挂起”的任务。通过 id 抓取任务的方式显然返回的不是同一个任务对象。

当我连接到 longtask() 函数时,task.state 首先是“PENDING”,然后在 15 秒后它应该是“SUCCESS”。然后 celery worker 也返回结果,所以这部分工作。但是在我通过task = long_task.AsyncResult(task_id) 获取任务的taskstatus() 函数中,task.state 始终保持“待处理”状态,task.info 等其他属性保持None。为什么会发生这种情况以及如何正确访问我的任务对象?

Python 3.8.16 
Flask 2.2.2 
celery 5.2.7
RabbitMQ 3.11.9 

不幸的是,我的系统是 Windows,但一般来说它应该根据this post 工作。所以我这样开始我的芹菜工人:

celery -A app.celery worker --loglevel=info --pool=eventlet

代码:

import time
from flask import Flask, url_for, jsonify
from celery import Celery

app = Flask(__name__)

app.config['CELERY_BROKER_URL'] = 'amqp://celery:celery@localhost:5672/' 
app.config['result_backend'] = 'rpc://celery:celery@localhost:5672/' 

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
celery.set_default()

@celery.task(bind=True)
def long_task(self):
    for i in range(15):
        message = '{0} {1} {2}...'
        self.update_state(state='PROGRESS',
                          meta={'current': i, 'total': 15,
                                'status': message})
        time.sleep(1)
    return {'status': 'Done'}

@app.route('/longtask', methods=['POST'])
def longtask():
    task = long_task.apply_async()  # after 15 seconds: task.state == "SUCCESS"
    return jsonify({}), 202, {'Location': url_for('taskstatus', task_id=task.id)}

@app.route('/status/<task_id>')
def taskstatus(task_id):
    task = long_task.AsyncResult(task_id)  # task.state always "PENDING" 
    return jsonify({'result': task.state})

芹菜工人在 15 秒后返回成功:

[2023-02-22 16:06:57,697: INFO/MainProcess] Task app.long_task[37a4e58c-857b-470c-823e-d6b9759458e3] received
[2023-02-22 16:07:11,847: INFO/MainProcess] Task app.long_task[37a4e58c-857b-470c-823e-d6b9759458e3] succeeded in 14.140999999945052s: {'status': 'Done'}

【问题讨论】:

    标签: python celery


    【解决方案1】:

    经过 1.5 天的研究和绝望,我自己找到了答案。原因是 rpc 后端不存储任务(参见 this 帖子),如 celery docs 中所述。

    我仍然想知道,由于 ampq 似乎已被弃用为后端,是否有另一种方法可以使用 rabbitmq 作为存储任务值的后端?或者我现在必须设置 redis 或 mysql 吗?

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2011-03-19
      • 1970-01-01
      • 2021-04-01
      • 2019-05-25
      • 2016-10-14
      • 2021-01-02
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多