【问题标题】:Route celery task to specific queue将 celery 任务路由到特定队列
【发布时间】:2012-04-22 05:11:09
【问题描述】:

我的服务器上运行着两个独立的 celeryd 进程,由supervisor 管理。它们被设置为监听不同的队列,如下所示:

[program:celeryd1]
command=/path/to/celeryd --pool=solo --queues=queue1
...

[program:celeryd2]
command=/path/to/celeryd --pool=solo --queues=queue2
...

我的 celeryconfig 看起来像这样:

from celery.schedules import crontab

BROKER_URL = "amqp://guest:guest@localhost:5672//"

CELERY_DISABLE_RATE_LIMITS = True
CELERYD_CONCURRENCY = 1
CELERY_IGNORE_RESULT = True

CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = {
    'default': {
        "exchange": "default",
        "binding_key": "default",
    },
    'queue1': {
        'exchange': 'queue1',
        'routing_key': 'queue1',
    },
    'queue2': {
        'exchange': 'queue2',
        'routing_key': 'queue2',
    },
}

CELERY_IMPORTS = ('tasks', )

CELERYBEAT_SCHEDULE = {
    'first-queue': {
        'task': 'tasks.sync',
        'schedule': crontab(hour=02, minute=00),
        'kwargs': {'client': 'client_1'},
        'options': {'queue': 'queue1'},
    },
    'second-queue': {
        'task': 'tasks.sync',
        'schedule': crontab(hour=02, minute=00),
        'kwargs': {'client': 'client_2'},
        'options': {'queue': 'queue1'},
    },
}

所有tasks.sync 任务必须路由到特定队列(因此​​ celeryd 进度)。但是当我尝试使用 sync.apply_async(kwargs={'client': 'value'}, queue='queue1') 手动运行任务时,两个 celery 工人都接过了任务。如何使任务路由到正确的队列并且只由绑定到队列的工作人员运行?

【问题讨论】:

    标签: python celery supervisord


    【解决方案1】:

    你只运行了一个 celerybeat 实例,对吧?

    也许您有与此冲突的旧队列绑定? 尝试运行rabbitmqctl list_queuesrabbitmqctl list_bindings, 可能会重置代理中的数据以从头开始。

    您在此处的示例应该可以工作,并且在我刚尝试时对我有用。

    提示:由于您使用与队列名称相同的 exchange 和 binding_key 值, 您不必在 CELERY_QUEUES 中明确列出它们。当 CELERY_CREATE_MISSING_QUEUES 开启(默认情况下)队列将像您一样自动创建 如果您只是执行celeryd -Q queue1 或将任务发送到未定义的队列。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-01-08
      • 2013-06-02
      • 2020-04-06
      • 2015-06-16
      • 2017-04-02
      • 2020-05-20
      相关资源
      最近更新 更多