【问题标题】:Celery outside of flask application context烧瓶应用程序上下文之外的芹菜
【发布时间】:2018-12-17 15:36:00
【问题描述】:

运行 celery 任务时出现以下错误,即使使用 Flask 应用程序上下文:

raised unexpected: RuntimeError('Working outside of application context.\n\nThis typically means that you attempted to use functionality that needed\nto interface with the current application object in some way. To solve\nthis, set up an application context with app.app_context().  See the\ndocumentation for more information.',)

Traceback (most recent call last):
  File "/usr/lib/python3.6/site-packages/celery/app/trace.py", line 382, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/usr/lib/python3.6/site-packages/celery/app/trace.py", line 641, in __protected_call__
    return self.run(*args, **kwargs)
  File "/app/example.py", line 172, in start_push_task
    }, data=data)
  File "/app/push.py", line 65, in push
    if user and not g.get('in_celery_task') and 'got_user' not in g:
  File "/usr/lib/python3.6/site-packages/werkzeug/local.py", line 347, in __getattr__
    return getattr(self._get_current_object(), name)
  File "/usr/lib/python3.6/site-packages/werkzeug/local.py", line 306, in _get_current_object
    return self.__local()
  File "/usr/lib/python3.6/site-packages/flask/globals.py", line 44, in _lookup_app_object
    raise RuntimeError(_app_ctx_err_msg)
RuntimeError: Working outside of application context.

This typically means that you attempted to use functionality that needed
to interface with the current application object in some way. To solve
this, set up an application context with app.app_context().  See the
documentation for more information.

有什么办法解决这个问题?

【问题讨论】:

    标签: python flask celery celery-task


    【解决方案1】:

    对我来说,问题在于我使用的是 import celery 而不是 from app import celery

    以下是我的更多设置代码,供以后偶然发现这里的人使用:

    app.py

    def make_celery(app):
        app.config['broker_url'] = 'amqp://rabbitmq:rabbitmq@rabbit:5672/'
        app.config['result_backend'] = 'rpc://rabbitmq:rabbitmq@rabbit:5672/'
    
        celery = Celery(app.import_name, backend=app.config['result_backend'], broker=app.config['broker_url'])
        celery.conf.update(app.config)
    
        class ContextTask(Task):
            abstract = True
    
            def __call__(self, *args, **kwargs):
                with app.test_request_context():
                    g.in_celery_task = True
                    res = self.run(*args, **kwargs)
                    return res
    
        celery.Task = ContextTask
        celery.config_from_object(__name__)
        celery.conf.timezone = 'UTC'
        return celery
    
    celery = make_celery(app)
    

    在另一个文件中:

    from app import celery
    

    【讨论】:

      猜你喜欢
      • 2016-10-23
      • 1970-01-01
      • 2020-04-15
      • 2012-10-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多