【问题标题】:Perform celery task after successful commit in Flask?在 Flask 中成功提交后执行 celery 任务?
【发布时间】:2018-03-25 22:51:08
【问题描述】:

运行时间相对较长的任务被委派给在另一台服务器上单独运行的 celery worker。

但是,结果被添加回关系数据库(根据task_descr.id 作为键更新表,见下文),worker 使用ignore_result

从 Flask 应用程序请求的任务:

task = app.celery.send_task('tasks.mytask', [task_descr.id, attachments])

问题是在 Flask 端的事务尚未关闭时请求任务。这会导致竞争条件,因为有时 celery worker 在 Flask 应用程序中的事务结束之前完成任务。

仅在成功交易后发送任务的正确方法是什么?

或者工人应该在尝试有条件的UPDATE之前检查task_descr.id的可用性并重试任务(这感觉太复杂的安排)?

Run function after a certain type of model is committed 的回答讨论了类似的情况,但这里任务发送是明确的,因此不需要在某些模型中监听更新/插入。

【问题讨论】:

    标签: flask transactions celery race-condition


    【解决方案1】:

    其中一种方法是Per-Request After-Request Callbacks,感谢 Armin Ronacher:

    from flask import g
    
    def after_this_request(func):
        if not hasattr(g, 'call_after_request'):
            g.call_after_request = []
        g.call_after_request.append(func)
        return func
    
    
    @app.after_request
    def per_request_callbacks(response):
        for func in getattr(g, 'call_after_request', ()):
            response = func(response)
        return response
    

    在我的例子中,用法是嵌套函数的形式:

        task_desc = ...
        attachments = ...
        #...
        @after_this_request
        def send_mytask(response):
            if response.status_code in {200, 302}:
                task = app.celery.send_task('tasks.mytask', [task_descr.id, attachments])
            return response
    

    不理想,但有效。我的任务只针对成功服务的请求,所以我不关心 500s 或其他错误情况。

    【讨论】:

    • 使用flask g来处理这个很好吗,以及你如何将task_descr.id传递给这个函数send_mytask
    • Armin Ronacher 是 Fl​​ask 的创造者,那么为什么这不应该是生产就绪的呢?关于g:flask.palletsprojects.com/en/1.0.x/api/#flask.g——这里是存储请求时间数据的地方。
    • 变量来自于外部函数或方法。函数send_mytask 是一个纯粹用于after_this_request 装饰器的闭包。
    猜你喜欢
    • 1970-01-01
    • 2019-02-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-06-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多