【问题标题】:Flask celery tasks not working烧瓶芹菜任务不起作用
【发布时间】:2015-12-16 11:59:29
【问题描述】:

我配置了我的项目,参考了这个答案: How to use Flask-SQLAlchemy in a Celery task

我的extension.py 文件:

import flask
from flask.ext.sqlalchemy import SQLAlchemy
from config import BaseConfig
from celery import Celery
from flask_mail import Mail
from celery.schedules import crontab


class FlaskCelery(Celery):

    def __init__(self, *args, **kwargs):

        super(FlaskCelery, self).__init__(*args, **kwargs)
        self.patch_task()

        if 'app' in kwargs:
            self.init_app(kwargs['app'])

    def patch_task(self):
        TaskBase = self.Task
        _celery = self

        class ContextTask(TaskBase):
            abstract = True

            def __call__(self, *args, **kwargs):
                if flask.has_app_context():
                    return TaskBase.__call__(self, *args, **kwargs)
                else:
                    with _celery.app.app_context():
                        return TaskBase.__call__(self, *args, **kwargs)

        self.Task = ContextTask

    def init_app(self, app):
        self.app = app
        self.config_from_object(app.config)


mail = Mail()
db = SQLAlchemy()
settings = BaseConfig()
celery = FlaskCelery()

然后在我的app_settings.py 中,我创建了这个应用程序:

app = Flask('app', instance_relative_config=True)

并配置芹菜:

celery.init_app(app)

我使用 python manage.py run 运行了烧瓶项目:

app.run(
        debug=settings.get('DEBUG', False),
        host=settings.get('HOST', '127.0.0.1'),
        port=settings.get('PORT', 5000)
    )

然后跑了芹菜:

celery -A manage.celery worker --beat -l debug

芹菜原木看起来不错:

[tasks]
  . app.api.tasks.spin_file
  . app.main.tasks.send_async_email
  . celery.backend_cleanup
  . celery.chain
  ...

然后在views.py,我调用这个任务:

send_async_email.delay(*args, **kwargs)

但是 Celery 忽略了所有任务。没有任何反应,没有错误,没有警告。没有。我做错了什么?

编辑:当我用这个命令启动 celery 时:celery -A manage.celery worker --beat -l debug

我收到以下警告:

[2015-09-21 10:04:32,220: WARNING/MainProcess] /home/.virtualenvs/myproject/local/lib/python2.7/site-packages/celery/app/control.py:36: DuplicateNodenameWarning: Received multiple replies from node name: 'name'.
Please make sure you give each node a unique nodename using the `-n` option.
  pluralize(len(dupes), 'name'), ', '.join(sorted(dupes)),

【问题讨论】:

    标签: python configuration flask celery celery-task


    【解决方案1】:

    我不确定这是否会对您有所帮助,但是当我需要 celery 时,我会在我的许多项目中使用此代码:

    from flask import Flask, request, jsonify as jsn
    from celery import Celery
    app = Flask(__name__)
    app.config.update(dict(
        SECRET_KEY='blabla'
        )
    )
    # Celery configuration
    app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
    app.config['CELERY_RESULT_BACKEND'] = 'database'
    app.config['CELERY_RESULT_DBURI'] = 'sqlite:///temp.db'
    app.config['CELERY_TRACK_STARTED'] = True
    app.config['CELERY_SEND_EVENTS'] = True
    
    # Initialize Celery
    celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    
    @celery.task
    def do_something(data):
    
        from celery import current_task
        import os
        import subprocess
        with app.app_context():
            #run some bash script with some params in my case
    

    然后我通过以下方式与主管一起运行 celery:

    #!/bin/bash
    cd /project/location && . venv/bin/activate && celery worker -A appname.celery --loglevel=info --purge #appname is my main flask file
    

    当然,在我的路线上,我有类似的东西

    @app.route('/someroute', methods=["POST"])
    def someroute():
        result = do_something.delay(data)
        print result.id
    

    【讨论】:

    • 伙计,我这边没有任何效果,试图遵循你的方法。 Celery worker 和 heartbeat 运行,但数据库中没有存储任何内容
    • 当我使用上面的示例时,它工作正常。但是,如果我在自己的代码中运行它,我需要 redis (ImportError: Missing redis library (pip install redis))。为什么会出现这种情况?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-04-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多