【问题标题】:Celery multiple queues not working properly. All the tasks are sent to default queue芹菜多个队列无法正常工作。所有任务都发送到默认队列
【发布时间】:2017-09-22 22:09:09
【问题描述】:

我将 Celery 与 Flask 应用程序一起使用,这是我的配置:

app.config['CELERY_TASK_QUEUES'] = (
    Queue('fast', Exchange('fast'), routing_key='fast'),
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('processing', Exchange('processing'), routing_key='processing'),
)

app.config['CELERY_TASK_ROUTES'] = {
    'app.tasks.extract_text': {'queue': 'processing', 'routing_key': 'processing'},
    ...

    'app.tasks.vt_notifications': {'queue': 'default', 'routing_key': 'default'},
    ...

    'app.tasks.update_files_from_search': {'queue': 'fast', 'routing_key': 'fast'},
    ...
}


app.config['CELERY_DEFAULT_QUEUE'] = 'default'
app.config['CELERY_DEFAULT_EXCHANGE'] = 'default'
app.config['CELERY_DEFAULT_ROUTING_KEY'] = 'default'

我最终运行了这样的 celery 实例:

celery -A app.tasks.celery worker -Q 'processing' --concurrency 1 -l debug -n processing
celery -A app.tasks.celery worker -Q 'fast' --concurrency 1 -l debug -n fast
celery -A app.tasks.celery worker -Q 'default' --concurrency 1 -l debug -n default

所以,问题在于所有任务都被发送到“默认”队列。非常感谢任何帮助。谢谢!

【问题讨论】:

  • 芹菜版?

标签: python python-2.7 flask celery


【解决方案1】:

如果使用 celery >4,我会推荐几件事: 首先,尝试将name 添加到 您的任务(以确保您在 CELERY_TASK_ROUTES 中使用正确的名称。例如:

@app.task(name='extract_text'])
    def extract_text(..):
        pass

其次,尝试将CELERY_TASK_ROUTES改为:

CELERY_ROUTES = {
    'extract_text': {
        'exchange': 'processing',
        'exchange_type': 'direct',
        'routing_key': 'processing'
    }
}

(而不是queue - 尝试添加exchangeexchange_type)。

最后一点,你不必使用它,只是为了调试,你可以在触发时显式路由任务:

(extract_text.signature(args=(...), queue='processing')).delay()

编辑:

您确定根据需要使用配置吗?这是一个例子:

celery_app = Celery()
celeryconfig = {}
celeryconfig['BROKER_URL'] = 'amqp://'
celeryconfig['CELERY_RESULT_BACKEND'] = 'redis://localhost'
celeryconfig['CELERY_QUEUES'] = (
    Queue('fast', Exchange('fast'), routing_key='fast'),
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('processing', Exchange('processing'), routing_key='processing'),
)
celeryconfig['CELERY_ROUTES'] = {
    'extract_text': {
        'exchange': 'processing',
        'exchange_type': 'direct',
        'routing_key': 'processing'
    }
}

celery_app.config_from_object(celeryconfig)

【讨论】:

  • 如果我在触发时明确路由任务,它会完美运行。看起来 CELERY_TASK_ROUTES 变量只是被忽略了
  • 您尝试过CELERY_ROUTES 吗?也许你没有根据需要使用配置,我会在一秒钟内更新我的答案
  • 我最终使用了@task(queue='queue_name') 装饰器
猜你喜欢
  • 2016-01-28
  • 2020-02-01
  • 1970-01-01
  • 1970-01-01
  • 2017-11-18
  • 1970-01-01
  • 2014-06-09
  • 2019-12-29
  • 1970-01-01
相关资源
最近更新 更多