【问题标题】:How to access the orm with celery tasks?如何使用 celery 任务访问 orm?
【发布时间】:2015-10-20 18:18:30
【问题描述】:

我正在尝试使用 sqlalchemy+celery beats 为我的数据库中特定类型的对象翻转一个布尔标志。但是如何从 tasks.py 文件访问我的 orm?

from models import Book
from celery.decorators import periodic_task
from application import create_celery_app

celery = create_celery_app()
# Create celery: http://flask.pocoo.org/docs/0.10/patterns/celery/

# This task works fine
@celery.task
def celery_send_email(to,subject,template):
    with current_app.app_context():
        msg = Message(
            subject,
            recipients=[to],
            html=template,
            sender=current_app.config['MAIL_DEFAULT_SENDER']
        )
        return mail.send(msg)

#This fails
@periodic_task(name='release_flag',run_every=timedelta(seconds=10))
def release_flag():
    with current_app.app_context(): <<< #Fails on this line
        books = Book.query.all() <<<< #Fails here too
        for book in books:
          book.read = True
          book.save()

我正在使用 celery beat 命令来运行它:

celery -A 任务工作者 -l INFO --beat

但我收到以下错误:

raise RuntimeError('working outside of application context')
RuntimeError: working outside of application context

这又指向了 with current_app.app_context() 行

如果我删除 current_app.app_context() 行,我会收到以下错误:

RuntimeError: application not registered on db instance and no application bound to current context

有没有一种特殊的方法可以访问 celery 任务的 flask-sqlalchemy orm?或者对我正在尝试做的事情有更好的方法吗?

到目前为止,唯一可行的解​​决方法是在我的应用程序工厂模式中的 db.init_app(app) 之后添加以下行:

db.app = 应用程序

我正在关注这个 repo 来创建我的 celery 应用 https://github.com/mattupstate/overholt/blob/master/overholt/factory.py

【问题讨论】:

    标签: flask celery flask-sqlalchemy celery-task celerybeat


    【解决方案1】:

    您收到该错误是因为current_app 需要应用上下文才能工作,但您正尝试使用它来设置应用上下文。您需要使用实际应用设置上下文,然后您可以使用current_app

    with app.app_context():
        # do stuff that requires the app context
    

    或者您可以使用the Flask docs 中描述的模式来继承celery.Task,以便它默认了解应用上下文。

    from celery import Celery
    
    def make_celery(app):
         celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
         celery.conf.update(app.config)
         TaskBase = celery.Task
    
         class ContextTask(TaskBase):
             abstract = True
    
             def __call__(self, *args, **kwargs):
                 with app.app_context():
                     return TaskBase.__call__(self, *args, **kwargs)
    
         celery.Task = ContextTask
         return celery
    
     celery = make_celery(app)
    

    【讨论】:

      猜你喜欢
      • 2011-10-07
      • 2023-02-12
      • 2023-03-07
      • 1970-01-01
      • 1970-01-01
      • 2016-12-25
      • 2011-03-22
      • 2016-03-31
      • 1970-01-01
      相关资源
      最近更新 更多