【问题标题】:Direct to a specific Rabbitmq queue from celery从 celery 直接到特定的 Rabbitmq 队列
【发布时间】:2021-09-17 10:10:58
【问题描述】:

我有一个 RabbitMQ 服务器正在运行,我正在尝试在其中执行一项任务。 我为此使用 celery,并希望通过特定交换定向到特定队列。

芹菜代码

broker_uri='amqp://xxxx:xxxx@xxxx:5672;amqp://xxxx:xxxx@xxxx:5672;amqp://xxxx:xxxx@xxxx:5672/'
backend_uri="mongodb+srv://xxxxxx.mongodb.net/celery_test?retryWrites=true&w=majority"

app = Celery('TestApp', broker=broker_uri,backend=backend_uri)

@app.task
def reverse(text):
    sleep(10)
    return text[:-1]

当我运行它时,它会自动访问我尚未定义的队列和交换。 如何更改?

执行结果

- *** --- * --- .> concurrency: 3 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

【问题讨论】:

    标签: rabbitmq celery django-celery


    【解决方案1】:

    您可以使用kombu定义队列。

    from kombu import Queue, Exchange
    sample_queue = Queue(name='sample_queue', exchange=Exchange('sample_exchange', <set exchange type>'topic', durable=True),
                    routing_key='sample')
    

    将任务路由到队列集task_routes

    task_routes = {
        'reverse': {
            'queue': 'sample_queue',
            'routing_key': 'sample',
        },
    }
    

    查看文档exchanges-queues-and-routing-keys

    【讨论】:

      【解决方案2】:

      显然,队列和交换名称可以这样定义,

      app.conf.task_default_queue='queue'
      app.conf.task_default_exchange='exchange'
      app.conf.task_default_routing_key='key'
      
      

      似乎无法指定队列和交换的类型。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2012-04-22
        • 1970-01-01
        • 2011-05-08
        • 1970-01-01
        • 2012-10-21
        • 1970-01-01
        • 2022-06-10
        相关资源
        最近更新 更多