【问题标题】:Rejecting and requeueing a RabbitMQ task when prefetch_count == 1prefetch_count == 1 时拒绝和重新排队 RabbitMQ 任务
【发布时间】:2014-06-20 18:52:23
【问题描述】:

假设我有一个包含五个项目的队列:

(tail) E, D, C, B, A (head)

我从这个队列的头部消费消息,但认为消息A 目前不适合处理。我用requeue=Truereject那个项目,队列变成:

(tail) A, E, D, C, B (head)

然后我消费BCDEack每一个。现在队列只保存A,我不断地使用它,reject 在一个永无止境的循环中一遍又一遍。如果有新的非A 消息进入,它几乎立即被消费,然后进程继续尝试消费A 的循环。

我对 Pika 文档中的 Twisted Consumer Example 稍作修改:

import pika
from pika import exceptions
from pika.adapters import twisted_connection
from twisted.internet import defer, reactor, protocol,task


@defer.inlineCallbacks
def run(connection):

    channel = yield connection.channel()

    exchange = yield channel.exchange_declare(exchange='topic_link',type='topic')

    queue = yield channel.queue_declare(queue='hello', auto_delete=False, exclusive=False)

    yield channel.queue_bind(exchange='topic_link',queue='hello',routing_key='hello.world')

    #yield channel.basic_qos(prefetch_count=1)

    queue_object, consumer_tag = yield channel.basic_consume(queue='hello',no_ack=False)

    l = task.LoopingCall(read, queue_object)

    l.start(0.01)


@defer.inlineCallbacks
def read(queue_object):

    ch,method,properties,body = yield queue_object.get()

    print body

    if body == 'A':
        yield ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
    else:
        yield ch.basic_ack(delivery_tag=method.delivery_tag)


parameters = pika.ConnectionParameters()
cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters)
d = cc.connectTCP('hostname', 5672)
d.addCallback(lambda protocol: protocol.ready)
d.addCallback(run)
reactor.run()

问题:注意以下注释掉的行:

#yield channel.basic_qos(prefetch_count=1)

当我取消注释,并且消费者收到消息 A,它会在 rejecting 之后立即再次接收它,忽略它后面的队列中可能等待的任何其他项目。它没有将被拒绝的项目放在队列的尾部,而是不断地重复尝试,完全阻塞队列中的所有其他项目。

注释掉该行后,它可以正常工作(尽管速度较慢)。如果该行存在并且prefetch_count > 1,它也可以工作。将其设置为精确的 1 会触发此行为。

在拒绝邮件A 时我是否缺少步骤?还是 Pika 的预取系统与这种极端情况根本不兼容?

【问题讨论】:

    标签: python rabbitmq pika


    【解决方案1】:

    如果您只有一个消费者,那么 RabbitMQ 只能将消息发送给被拒绝的同一消费者(无论如何:使用 basic.reject 或 basic.nack)。

    当您设置prefetch_count > 1 时,您的消费者将拥有您的循环消息加上循环旁边头部的新消息(字面意思是,您的循环消息将保持在头部)。

    如果您不小心收到N*Mprefetch_count <= N 和消费者编号M 的循环消息,您将循环所有消息(这会导致CPU 烧毁等),所以这可能是一个很好的捕获检查rejected 消息标志,如果消息已经重新传递,则具有一些高级逻辑。

    【讨论】:

    • 假设我不介意通过重复循环相同的消息来消耗 CPU 时间。有没有办法从预取队列中完全拒绝该消息,以便它后面的任何有效消息都可以在循环返回之前通过?
    • 您可以完全丢弃无效消息,也可以使用Dead Lettering 将其放入其他队列中,以便稍后手动或以其他方式处理,但这取决于您的消息流的活跃程度。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2012-10-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-09-01
    • 2012-12-14
    • 1970-01-01
    相关资源
    最近更新 更多