【问题标题】:Pika + RabbitMQ: setting basic_qos to prefetch=1 still appears to consume all messages in the queuePika + RabbitMQ:将 basic_qos 设置为 prefetch=1 似乎仍然会消耗队列中的所有消息
【发布时间】:2012-09-07 18:15:36
【问题描述】:

我有一个 python 工作程序客户端,它启动了 10 个工作程序,每个工作程序连接到一个 RabbitMQ 队列。有点像这样:

#!/usr/bin/python
worker_count=10

def mqworker(queue, configurer):
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='mqhost'))
    channel = connection.channel()
    channel.queue_declare(queue=qname, durable=True)
    channel.basic_consume(callback,queue=qname,no_ack=False)
    channel.basic_qos(prefetch_count=1)
    channel.start_consuming()


def callback(ch, method, properties, body):
    doSomeWork();
    ch.basic_ack(delivery_tag = method.delivery_tag)

if __name__ == '__main__':
    for i in range(worker_count):
        worker = multiprocessing.Process(target=mqworker)
        worker.start()

我遇到的问题是,尽管在通道上设置了 basic_qos,但第一个启动的工作人员接受了队列外的所有消息,而其他工作人员则坐在那里闲置。我可以在 rabbitmq 界面中看到这一点,即使我将 worker_count 设置为 1 并将 50 条消息转储到队列中,所有 50 条消息都进入“未确认”存储桶,而我希望 1 变为未确认,而另一个49 准备好了。

为什么这不起作用?

【问题讨论】:

    标签: rabbitmq pika qos


    【解决方案1】:

    我似乎已经通过移动调用basic_qos 的位置解决了这个问题。

    将它放在 channel = connection.channel() 之后似乎会改变我所期望的行为。

    【讨论】:

    • 谢谢!那确实解决了这个问题。顺便说一句,这很难调试..
    • @Hiagara 是的,今天我自己也遇到了这个问题。令人惊讶的是,将近 5 年后,API 中仍未明确或记录这一点。
    • 我认为我们应该在basic_consume 之前声明basic_qos。因为basic_consume在初始化的时候使用了这个设置。
    • 同意@rborodinov。我在basic_consume 之后有basic_qos,但它没有用。切换它们,现在可以正常工作了。
    • 在设置basic_consume 时,我还必须设置auto_ack=False 才能正常工作。否则它仍然消耗的消息比预期的要多。
    猜你喜欢
    • 1970-01-01
    • 2019-06-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-12-18
    • 2020-09-12
    • 2018-08-10
    相关资源
    最近更新 更多