【问题标题】:RabbitMq Unacknowledged Messages after Consumer basic_cancel消费者 basic_cancel 之后的 RabbitMq 未确认消息
【发布时间】:2019-02-04 16:32:03
【问题描述】:

好的,无需详细介绍我设置的整个系统,

我遇到的问题是,当消费者取消 (AMQPChannel->basic_cancel) 侦听队列时,它会留下一条额外的消息,该工作人员未确认。它也不会触发正常的回调来处理此消息。

一些细节

  • 队列阻塞(使用等待)while(count($channel->callbacks)) { $channel->wait( ... ) ... }
  • 预取为 1,您可以拥有的最小值
  • 消费者可以动态收听 (AMQPChannel->basic_consume)
  • 消费者可以动态忘记 (AMQPChannel->basic_cancel)

我不会详细说明我告诉给定消费者消费或取消给定队列的确切方式。但这一切都完美地开始消费或取消就好了。但是,当我取消一个仍然有消息的队列时,他们只是忘记了最后一条消息,并且我相信取消会删除该队列的回调,因此除了杀死不希望的消费者之外,没有办法恢复该消息。

我做了一些调试(只是一个例子,不是我实际做的)

   Debug::dump($this->getAmqpChannel()->getMethodQueue());
   $tag = $this->_tags[$queue]; //I keep track of the consumer tag on a queue by queue basis, $queue == {queuename} below
   $this->getAmqpChannel()->basic_cancel( $tag );
   Debug::dump($this->getAmqpChannel()->getMethodQueue());

这个的输出大概是

  array()
  RunCommand: basic_cancel //this works fine consumer forgets queue except ->
  array(1){
    [0] => array(3){
        [0] => string(5) "60,60",
        [1] => string(114) "amq.ctag-D9om-gD_cPevzeon52zpig\0\0\0\0\0\0\0\0\0G{queuename}",  //{queuename} is the name of the queue, which is based on clients information I cant share (such as their name)
       [2] => object(PhpAmqpLib\Message\AMQPMessage)#0 (9) {
            ["DELIVERY_MODE_NON_PERSISTENT":constant] => int(1),
            ["DELIVERY_MODE_PERSISTENT":constant] => int(2),
            ["body":public] => string(1358647) "{ ... "correlation_id":32,"max_correlation_id":38}"
            ["body_size":public] => int(135864),
            ["is_truncated":public] => bool(false),
            ["content_encoding":public] => null,
            ["propertyDefinitions":protected static] => array(14){ ... }
            ["delivery_info":public] => array(0){},
            ["prop_types":protected] => array(14){ ... }
      }
    }

一旦工作人员死亡(或者我宁可杀死它),消息会被放回队列中,我可以在 RabbitMq 管理事物(插件)中的获取消息下将其拉出。就是这样,

  Properties
    correlation_id: 32:38
    delivery_mode:  2
    content_encoding:   text/plain
    content_type:   application/json

"correlation_id":32,"max_correlation_id":38 对应于correlation_id: 32:38,因为我需要跟踪消息部分。所以我知道这是同样的信息。

那么,为什么在我取消后,我会收到最后一条卡在僵尸领域的消息,并且无论如何都要将其踢回队列而不杀死消费者。

此外,这不是一次性的,每次我取消其中仍有消息的队列时都会发生这种情况。所以它与给定的消息无关。就好像它得到了最后一条预取消息,然后因为它被取消了,所以最后一条消息没有回调来运行,它只是陷入了困境。请记住 0 预取是获取所有消息,1 是您可以设置的最低值。

任何可以提供的帮助都会很棒。

更新

我可以通过调用来解决问题

 $this->getAmqpChannel()->basic_recover(true); //basic_recover($requeue)

basic_cancel 之前或之后

这会拒绝该消息,我什至可以如上所示测试$this->getAmqpChannel()->getMethodQueue() 以查看我正在取消的$queue 是否有消息被封存(尚未实现)。

我试图避免使用recover,但我认为这应该没问题,因为消费者使用单一渠道并且被阻塞,最坏的情况是它只会拒绝有效消息 1 次,虽然不理想应该是可以接受的.

但是,在某些情况下,我从 Rabbit 那里得到了一个额外的异常,

  PRECONDITION_FAILED - unknown delivery tag {n}

如果有人对这个额外的错误有任何详细信息,那就太好了。而且所有的队列都需要 Ack,它们都不是自动的。

更新1

我在堆栈跟踪中注意到queue_unbind,所以我所做的是在内部跟踪绑定,这样我可以确保取消绑定只执行一次。我明天做更多测试后会发布一些代码,但我在实施后的初始测试不再产生错误。

所有这些听起来可能有点“奇怪”,我可以解释为什么以及我在做什么,但这可能超出了问题的范围。我会说我已经在生产中使用了这个系统超过 2 年(我设计了它),我们每分钟可以进行 18 万次搜索(如果考虑系统的所有部分,大约 100 次)。我们还使用它进行了超过 2800 亿次搜索,感觉是我构建的。我们现在也是我们行业中的领先公司,要么淘汰了我们的竞争对手,要么他们把他们的东西寄给我们,不再在内部做。这在很大程度上是因为我们的快速周转以及我们的数据质量。所以这个系统确实有效,而且效果很好。

但在最近的审计中,我注意到 Daily 消费者只需处理大约 1000 万行(大约 100 分钟的工作),而 Nightly 消费者只需处理大约 1 亿行(或大约 20 小时的工作)。 Daily 消费者可以在夜间工作,但只能在工作时间之外(因为它减少了白天的响应时间),因此有大约 10 小时的窗口,夜间工作只能在更小、功能更差的服务器上运行。这给我们的解决方案是,如果没有每日作业(客户提交的作业),它们可以动态地动态交换到夜间工作(数据仓库)。这应该保持大部分响应能力,同时在没有提交作业时不会浪费资源。我们可以在搜索中随心所欲地横向扩展,但我们确实为我们的主服务器支付了很多费用,并且浪费了大约 8 个小时的工作时间。

我可能会写一本小书来说明它是如何工作的,但希望这能让我对我的工作有一些基本的了解。我还受到合同中一些保密和非竞争条款的约束,因此我可以真正了解具体细节。

【问题讨论】:

    标签: php rabbitmq php-amqp


    【解决方案1】:

    Consumer 在 RabbitMQ 用语中表示队列中的订阅者。 (有关通道、消费者和连接之间差异的详细信息,请参阅this answer)。

    当您打开确认时,它们会为channel 打开。在该频道上传递的任何消息都将有一个与之关联的delivery tag。处理完消息后,您需要通过同一通道告诉服务器该消息已被处理。取消消费者对已传递消息的确认没有影响。事实上,接收消息、取消消费者、处理消息、然后发送确认将是一个完全有效的用例。

    因此,您有两个选择。您可以不确认消息,在这种情况下,您所要做的就是关闭通道,它将在队列的头部重新排队。或者,您可以确认它(nackack),在这种情况下,如果 nack,消息将重新排队,如果 ack,则丢弃消息。

    如果我没记错的话,不指定预取计数(通过basic.qos)将导致预取为零,这意味着您必须在接收下一条消息之前确认上一条消息。我可能错了。当然,如果您使用basic.get,您可以完全避免这个问题,而且对性能的影响很小。

    【讨论】:

    • 但是消息没有正确传递,因为它从未触发回调来处理消息。它只是陷入困境,因为在 AMQPChannel unset($this->callbacks[$consumerTag]);
    • 我也知道 Chanel 和消费者之间的区别,解释一下,消费者监听多个队列,在这些队列中专门用于发送消费者命令,其中一个命令可以是取消给定队列,因此消费者自己从队列中取消。此外,通道被消费者重复使用,因此除了杀死消费者之外,没有简单的方法来关闭通道。
    • 好的,那么你没有正确使用它。消费者不应监听多个队列。尽管可以,但这是一个坏主意。此外,消费者应该有自己的专用频道。同样,该协议允许其他情况,但这是一个坏主意 - 原因是问题的根源。
    • NOT specifying a prefetch count (via basic.qos) will result in prefetch of zero, 看到它没有默认值并且它是不可能的必需参数,public function basic_qos($prefetch_size, $prefetch_count, $a_global)
    • 不,我是说根本不要调用它。或者你可以试试basic_qos(1, 0, false) 看看是否有帮助。最终需要在取消消费者或nack消息后关闭通道,否则服务器认为正在处理最后一条消息。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-11-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多