【问题标题】:How does a Celery worker consuming from multiple queues decide which to consume from first?从多个队列消费的 Celery 工作人员如何决定从哪个队列开始消费?
【发布时间】:2013-01-24 11:41:49
【问题描述】:

我正在使用 Celery 执行异步后台任务,使用 Redis 作为后端。我对芹菜工人在以下情况下的行为感兴趣:

我正在使用celeryd 将工作程序作为守护进程运行。该工作人员已通过-Q 选项分配了两个队列以供使用:

celeryd -E -Q queue1,queue2

worker 如何决定从哪里获取下一个要消费的任务? 它是随机消费来自queue1queue2 的任务吗?它是否会优先从queue1 获取,因为它在传递给-Q 的参数列表中是第一个?

【问题讨论】:

    标签: scheduled-tasks celery celeryd


    【解决方案1】:

    根据我的测试,它处理多个队列循环方式

    如果我使用这个测试代码:

    from celery import task
    import time
    
    
    @task
    def my_task(item_id):
        time.sleep(0.5)
        print('Processing item "%s"...' % item_id)
    
    
    def add_items_to_queue(queue_name, items_count):
        for i in xrange(0, items_count):
            my_task.apply_async(('%s-%d' % (queue_name, i),), queue=queue_name)
    
    
    add_items_to_queue('queue1', 10)
    add_items_to_queue('queue2', 10)
    add_items_to_queue('queue3', 5)
    

    然后使用(使用 django-celery)开始队列:

    `manage.py celery worker -Q queue1,queue2,queue3`
    

    它输出:

    Processing item "queue1-0"...
    Processing item "queue3-0"...
    Processing item "queue2-0"...
    Processing item "queue1-1"...
    Processing item "queue3-1"...
    Processing item "queue2-1"...
    Processing item "queue1-2"...
    Processing item "queue3-2"...
    Processing item "queue2-2"...
    Processing item "queue1-3"...
    Processing item "queue3-3"...
    Processing item "queue2-3"...
    Processing item "queue1-4"...
    Processing item "queue3-4"...
    Processing item "queue2-4"...
    Processing item "queue1-5"...
    Processing item "queue2-5"...
    Processing item "queue1-6"...
    Processing item "queue2-6"...
    Processing item "queue1-7"...
    Processing item "queue2-7"...
    Processing item "queue1-8"...
    Processing item "queue2-8"...
    Processing item "queue1-9"...
    Processing item "queue2-9"...
    

    因此,即使所有 queue1 任务在 queue2 和 3 任务之前发布,它也会在进入下一个 queue1 项目之前从每个队列中提取一个项目。

    注意:正如@WarLord 指出的那样,这种确切的行为只有在CELERYD_PREFETCH_MULTIPLIER 设置为1 时才有效。如果它大于1,则意味着将从队列中获取项目批次。因此,如果您有 4 个进程的 PREFETCH_MULTIPLIER 设置为 4,这意味着将立即从队列中拉出 16 个项目,因此您不会得到上面的确切输出,但它仍然大致 遵循循环。

    【讨论】:

    • 在 2019 年,这不是正确的答案,请查看来自@crazyshezy 的正确和更新顺序 现在的顺序是基于到达队列的。
    【解决方案2】:

    注意:此答案已弃用:最新版本的 Celery 与 2013 年的工作方式大不相同......

    一个消费多个队列的工作人员消费任务,FIFO 顺序也是跨多个队列维护的。

    例子:

    队列 1 : (t1, t2, t5, t7)
    队列 2 : (t0,t3,t4,t6)

    假设0-7代表任务发布的顺序

    消费顺序是t0, t1, t2, t3, t4, t5, t6, t7

    【讨论】:

    • 有没有办法修改这种行为?也许让一个队列优先于另一个队列:t1, t2, t5, t7, t0, t3, t4, t6。
    • 不能使用 RabbitMQ,但可以使用其他消息传输,例如使用 Redis,但这将涉及子类化。请注意,您描述的顺序实际上是饥饿,如果 Queue1 一直很忙,Queue2 中的消息将永远不会被处理。
    • @Crazyshezy - 你是说如果我将 500 个项目放入队列 1,然后将 1 个项目放入队列 2,则队列 2 项目将在 500 个项目完成后才会得到处理,因为它到达那里在他们之后??如果将它们视为只是一个队列,这似乎违背了拥有多个队列的目的......
    • 根据我的测试,它以循环方式处理队列,因此,在您的示例中,消耗顺序为 t1、t0、t2、t3、t5、t4、t7、t6。但是,如果您将 PREFETCH_MULTIPLIER 设置为非 1 并且任务需要不同的时间来完成,那么顺序可能会有所不同。
    • 根据这个github.com/celery/celery/issues/2192#issuecomment-51799123它是由broker决定的。所以 Rabbit MQ 似乎以 FIFO 方式交付,我猜其他代理可能会进行循环。
    【解决方案3】:

    使用指向 rabbitmq 服务器的 pyamqp 代理库,任务以循环方式处理。请参阅下面的证明。

    似乎处理的订单任务是由代理决定的,而不是实际的后端(rabbitmq vs redis 不是问题)。

    软件版本:

    $ pip freeze | egrep "celery|kombu|amqp"
    amqp==2.5.2
    celery==4.4.2
    kombu==4.6.8
    
    from time import sleep
    
    @app.task
    def sleepy(name):
        print(f"Processing: {name}")
        sleep(0.5)
    

    然后在另一个 shell 中,将任务排队:

    from time import sleep
    
    def queue_them():
        for x in range(50):
            sleepy.apply_async(args=(f"Q1-T{x}",), queue="Q1")
        sleep(0.1)
        for x in range(20):
            sleepy.apply_async(args=(f"Q2-T{x}",), queue="Q2")
        sleep(0.1)
        sleepy.apply_async(args=("Q3-T0",), queue="Q3")
        for x in range(30):
            sleepy.apply_async(args=(f"Q2MOAR-T{x}",), queue="Q2")
    
    # setup - get celery to setup the queues and exchanges
    sleepy.apply_async(args=("nothing",), queue="Q1")
    sleepy.apply_async(args=("nothing",), queue="Q2")
    sleepy.apply_async(args=("nothing",), queue="Q3")
    
    # run the test
    queue_them()
    

    然后在另一个 shell 中运行 celery:

    $ celery worker -A myapp.celery --pool=prefork --concurrency=2 -Ofair --queues=Q1,Q3,Q2
    
    [2020-05-05 21:59:11,547] WARNING [celery.redirected:235] Processing: Q1-T1
    [2020-05-05 21:59:11,547] WARNING [celery.redirected:235] Processing: Q1-T0
    [2020-05-05 21:59:12,052] WARNING [celery.redirected:235] Processing: Q1-T2
    [2020-05-05 21:59:12,053] WARNING [celery.redirected:235] Processing: Q1-T3
    [2020-05-05 21:59:12,556] WARNING [celery.redirected:235] Processing: Q1-T5
    [2020-05-05 21:59:12,556] WARNING [celery.redirected:235] Processing: Q1-T4
    [2020-05-05 21:59:13,062] WARNING [celery.redirected:235] Processing: Q1-T6
    [2020-05-05 21:59:13,063] WARNING [celery.redirected:235] Processing: Q1-T7
    [2020-05-05 21:59:13,565] WARNING [celery.redirected:235] Processing: Q1-T9
    [2020-05-05 21:59:13,565] WARNING [celery.redirected:235] Processing: Q1-T8
    [2020-05-05 21:59:14,069] WARNING [celery.redirected:235] Processing: Q1-T10
    [2020-05-05 21:59:14,069] WARNING [celery.redirected:235] Processing: Q3-T0
    [2020-05-05 21:59:14,571] WARNING [celery.redirected:235] Processing: Q2-T0
    [2020-05-05 21:59:14,572] WARNING [celery.redirected:235] Processing: Q2-T1
    [2020-05-05 21:59:15,078] WARNING [celery.redirected:235] Processing: Q1-T11
    [2020-05-05 21:59:15,078] WARNING [celery.redirected:235] Processing: Q2-T2
    [2020-05-05 21:59:15,581] WARNING [celery.redirected:235] Processing: Q2-T3
    [2020-05-05 21:59:15,581] WARNING [celery.redirected:235] Processing: Q1-T12
    [2020-05-05 21:59:16,084] WARNING [celery.redirected:235] Processing: Q1-T13
    [2020-05-05 21:59:16,084] WARNING [celery.redirected:235] Processing: Q2-T4
    [2020-05-05 21:59:16,586] WARNING [celery.redirected:235] Processing: Q1-T14
    [2020-05-05 21:59:16,586] WARNING [celery.redirected:235] Processing: Q2-T5
    [2020-05-05 21:59:17,089] WARNING [celery.redirected:235] Processing: Q1-T15
    [2020-05-05 21:59:17,089] WARNING [celery.redirected:235] Processing: Q2-T6
    [2020-05-05 21:59:17,591] WARNING [celery.redirected:235] Processing: Q1-T16
    [2020-05-05 21:59:17,592] WARNING [celery.redirected:235] Processing: Q2-T7
    [2020-05-05 21:59:18,094] WARNING [celery.redirected:235] Processing: Q1-T17
    [2020-05-05 21:59:18,094] WARNING [celery.redirected:235] Processing: Q2-T8
    [2020-05-05 21:59:18,597] WARNING [celery.redirected:235] Processing: Q1-T18
    [2020-05-05 21:59:18,597] WARNING [celery.redirected:235] Processing: Q2-T9
    [2020-05-05 21:59:19,102] WARNING [celery.redirected:235] Processing: Q1-T19
    [2020-05-05 21:59:19,102] WARNING [celery.redirected:235] Processing: Q1-T20
    [2020-05-05 21:59:19,607] WARNING [celery.redirected:235] Processing: Q1-T21
    [2020-05-05 21:59:19,607] WARNING [celery.redirected:235] Processing: Q1-T22
    [2020-05-05 21:59:20,110] WARNING [celery.redirected:235] Processing: Q1-T23
    [2020-05-05 21:59:20,110] WARNING [celery.redirected:235] Processing: Q2-T10
    [2020-05-05 21:59:20,614] WARNING [celery.redirected:235] Processing: Q1-T24
    [2020-05-05 21:59:20,614] WARNING [celery.redirected:235] Processing: Q2-T11
    [2020-05-05 21:59:21,118] WARNING [celery.redirected:235] Processing: Q1-T25
    [2020-05-05 21:59:21,118] WARNING [celery.redirected:235] Processing: Q1-T26
    [2020-05-05 21:59:21,622] WARNING [celery.redirected:235] Processing: Q2-T12
    [2020-05-05 21:59:21,622] WARNING [celery.redirected:235] Processing: Q1-T27
    [2020-05-05 21:59:22,124] WARNING [celery.redirected:235] Processing: Q1-T28
    [2020-05-05 21:59:22,124] WARNING [celery.redirected:235] Processing: Q2-T13
    [2020-05-05 21:59:22,627] WARNING [celery.redirected:235] Processing: Q2-T14
    [2020-05-05 21:59:22,627] WARNING [celery.redirected:235] Processing: Q1-T29
    [2020-05-05 21:59:23,129] WARNING [celery.redirected:235] Processing: Q1-T31
    [2020-05-05 21:59:23,129] WARNING [celery.redirected:235] Processing: Q1-T30
    [2020-05-05 21:59:23,631] WARNING [celery.redirected:235] Processing: Q2-T15
    [2020-05-05 21:59:23,632] WARNING [celery.redirected:235] Processing: Q1-T32
    [2020-05-05 21:59:24,134] WARNING [celery.redirected:235] Processing: Q1-T33
    [2020-05-05 21:59:24,134] WARNING [celery.redirected:235] Processing: Q2-T16
    [2020-05-05 21:59:24,636] WARNING [celery.redirected:235] Processing: Q2-T17
    [2020-05-05 21:59:24,636] WARNING [celery.redirected:235] Processing: Q2-T18
    [2020-05-05 21:59:25,138] WARNING [celery.redirected:235] Processing: Q2-T19
    [2020-05-05 21:59:25,139] WARNING [celery.redirected:235] Processing: Q1-T34
    [2020-05-05 21:59:25,641] WARNING [celery.redirected:235] Processing: Q1-T35
    [2020-05-05 21:59:25,642] WARNING [celery.redirected:235] Processing: Q2MOAR-T0
    [2020-05-05 21:59:26,144] WARNING [celery.redirected:235] Processing: Q1-T36
    [2020-05-05 21:59:26,144] WARNING [celery.redirected:235] Processing: Q1-T37
    [2020-05-05 21:59:26,649] WARNING [celery.redirected:235] Processing: Q2MOAR-T1
    [2020-05-05 21:59:26,649] WARNING [celery.redirected:235] Processing: Q1-T38
    [2020-05-05 21:59:27,153] WARNING [celery.redirected:235] Processing: Q2MOAR-T2
    [2020-05-05 21:59:27,154] WARNING [celery.redirected:235] Processing: Q1-T39
    [2020-05-05 21:59:27,656] WARNING [celery.redirected:235] Processing: Q2MOAR-T3
    [2020-05-05 21:59:27,656] WARNING [celery.redirected:235] Processing: Q2MOAR-T4
    [2020-05-05 21:59:28,159] WARNING [celery.redirected:235] Processing: Q2MOAR-T5
    [2020-05-05 21:59:28,160] WARNING [celery.redirected:235] Processing: Q1-T40
    [2020-05-05 21:59:28,664] WARNING [celery.redirected:235] Processing: Q2MOAR-T6
    [2020-05-05 21:59:28,664] WARNING [celery.redirected:235] Processing: Q1-T41
    [2020-05-05 21:59:29,167] WARNING [celery.redirected:235] Processing: Q2MOAR-T7
    [2020-05-05 21:59:29,167] WARNING [celery.redirected:235] Processing: Q1-T42
    

    当 celery 以 1 并发运行时,结果类似:

    [2020-05-05 22:01:33,879] WARNING [celery.redirected:235] Processing: Q1-T0
    [2020-05-05 22:01:34,385] WARNING [celery.redirected:235] Processing: Q1-T1
    [2020-05-05 22:01:34,888] WARNING [celery.redirected:235] Processing: Q1-T2
    [2020-05-05 22:01:35,391] WARNING [celery.redirected:235] Processing: Q1-T3
    [2020-05-05 22:01:35,894] WARNING [celery.redirected:235] Processing: Q1-T4
    [2020-05-05 22:01:36,397] WARNING [celery.redirected:235] Processing: Q1-T5
    [2020-05-05 22:01:36,899] WARNING [celery.redirected:235] Processing: Q3-T0
    [2020-05-05 22:01:37,404] WARNING [celery.redirected:235] Processing: Q2-T0
    [2020-05-05 22:01:37,907] WARNING [celery.redirected:235] Processing: Q2-T1
    [2020-05-05 22:01:38,411] WARNING [celery.redirected:235] Processing: Q1-T6
    [2020-05-05 22:01:38,913] WARNING [celery.redirected:235] Processing: Q2-T2
    [2020-05-05 22:01:39,417] WARNING [celery.redirected:235] Processing: Q2-T3
    [2020-05-05 22:01:39,919] WARNING [celery.redirected:235] Processing: Q2-T4
    [2020-05-05 22:01:40,422] WARNING [celery.redirected:235] Processing: Q1-T7
    [2020-05-05 22:01:40,925] WARNING [celery.redirected:235] Processing: Q2-T5
    [2020-05-05 22:01:41,429] WARNING [celery.redirected:235] Processing: Q1-T8
    

    【讨论】:

    • 在循环赛中,我希望 Q1>Q2>Q3 保持一致,日志显示出非常不同的行为,不是吗?
    猜你喜欢
    • 2014-06-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-11-24
    • 2015-01-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多