【问题标题】:Flask Celery update_state from inside another functionFlask Celery update_state 来自另一个函数
【发布时间】:2017-03-08 16:08:01
【问题描述】:

我想从另一个函数更新我的 Celery 任务的状态。这是我现在拥有的:

路线

@app.route('/my-long-function', methods=['POST'])
def my_long_function():

    param1 = request.form['param1']
    param2 = request.form['param2']

    task = outside_function.delay(param1, param2)

    return task.id

Celery 任务 - 在后台启动 some_python_script.handle

@celery.task(name='outside_function')
def outside_function(param1, param2):
    with app.app_context():
        some_python_script.handle(param1, param2)

some_python_script.handle:

def handle(param1, param2):
    param1 + param2
    # many, many different things

理想情况下,我希望能够 self.update_state 芹菜任务,以便我可以轻松地从我的应用中请求其状态,如下所示:

some_python_script.handle(理想情况下):

def handle(param1, param2):
    param1 + param2
    # many, many different things
    self.outside_function.update_state('PROGRESS', meta = {'status':'progressing'})

检查进度(理想情况下):

@app.route('/status/<task_id>')
def taskstatus(task_id):
    task = outside_function.AsyncResult(task_id)
    response = {
    'state': task.state,
    'id': task.id,
    'status' : task.status,
    }

    return jsonify(response)

或类似的东西。非常感谢任何帮助,我对 Celery 很陌生!

【问题讨论】:

    标签: flask celery celery-task


    【解决方案1】:

    您应该声明调用的任务 ID。 您可以查看update_state

    下面的代码应该可以工作。

    # capture id of celery task
    ID = self.request.id
    
    def handle(param1, param2):
        param1 + param2
        # many, many different things
        # update the state of celery task with direct reference to it
        self.update_state(task_id=ID, state='PROGRESS', meta = {'status':'progressing'})
    

    【讨论】:

    • 它不能工作,因为在handle的范围内没有self
    【解决方案2】:

    如果您将bind=True 添加到主 celery 任务,您可以使用 self 关键字访问 celery 任务对象。使用self 将任务对象传递给下一个函数。

    @celery.task(name='outside_function', bind=True)
    def outside_function(self, param1, param2):
        with app.app_context():
            some_python_script.handle(self,param1, param2)
    

    另一个函数可以接受这个任务对象并更新它:

    def handle(celery_task, param1, param2):
        param1 + param2
        # many, many different things
        celery_task.update_state('PROGRESS', meta = {'status':'progressing'})
    

    【讨论】:

      【解决方案3】:

      使用celery_app.backend.store_result(task_id, state, result) 可以解决这个问题 store_resultupdate_state使用

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2012-02-16
        • 1970-01-01
        • 1970-01-01
        • 2013-12-29
        • 1970-01-01
        • 2020-08-11
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多