【发布时间】:2023-02-23 08:39:19
【问题描述】:
根据本教程:https://blog.miguelgrinberg.com/post/using-celery-with-flask,我正在尝试使用烧瓶运行芹菜,然后显示结果。但是在任务成功完成后,flask 应用程序仍然看到一个“挂起”的任务。通过 id 抓取任务的方式显然返回的不是同一个任务对象。
当我连接到 longtask() 函数时,task.state 首先是“PENDING”,然后在 15 秒后它应该是“SUCCESS”。然后 celery worker 也返回结果,所以这部分工作。但是在我通过task = long_task.AsyncResult(task_id) 获取任务的taskstatus() 函数中,task.state 始终保持“待处理”状态,task.info 等其他属性保持None。为什么会发生这种情况以及如何正确访问我的任务对象?
Python 3.8.16
Flask 2.2.2
celery 5.2.7
RabbitMQ 3.11.9
不幸的是,我的系统是 Windows,但一般来说它应该根据this post 工作。所以我这样开始我的芹菜工人:
celery -A app.celery worker --loglevel=info --pool=eventlet
代码:
import time
from flask import Flask, url_for, jsonify
from celery import Celery
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'amqp://celery:celery@localhost:5672/'
app.config['result_backend'] = 'rpc://celery:celery@localhost:5672/'
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
celery.set_default()
@celery.task(bind=True)
def long_task(self):
for i in range(15):
message = '{0} {1} {2}...'
self.update_state(state='PROGRESS',
meta={'current': i, 'total': 15,
'status': message})
time.sleep(1)
return {'status': 'Done'}
@app.route('/longtask', methods=['POST'])
def longtask():
task = long_task.apply_async() # after 15 seconds: task.state == "SUCCESS"
return jsonify({}), 202, {'Location': url_for('taskstatus', task_id=task.id)}
@app.route('/status/<task_id>')
def taskstatus(task_id):
task = long_task.AsyncResult(task_id) # task.state always "PENDING"
return jsonify({'result': task.state})
芹菜工人在 15 秒后返回成功:
[2023-02-22 16:06:57,697: INFO/MainProcess] Task app.long_task[37a4e58c-857b-470c-823e-d6b9759458e3] received
[2023-02-22 16:07:11,847: INFO/MainProcess] Task app.long_task[37a4e58c-857b-470c-823e-d6b9759458e3] succeeded in 14.140999999945052s: {'status': 'Done'}
【问题讨论】: