【问题标题】:Is it possible to ensure unique messages are in a rabbitmq queue?是否可以确保唯一的消息在 rabbitmq 队列中?
【发布时间】:2012-04-26 15:51:29
【问题描述】:

基本上,我的消费者也是生产者。我们得到一个初始数据集并将其发送到队列。消费者拿走一件物品并处理它,从那时起有 3 种可能性:

  1. 数据很好,正在为存储放置一个“好”队列
  2. 数据错误并被丢弃
  3. 数据既不好(还)也不好(还),因此数据被分解成更小的部分并送回队列进行进一步处理。

我的问题在于第 3 步,因为队列起初增长得非常快,因此有可能将一段数据分解为队列中重复的部分,而消费者继续处理它并最终陷入无限循环.

我认为防止这种情况的方法是防止重复进入队列。我不能在客户端执行此操作,因为在一个小时的过程中,我可能有许多核心处理数十亿个数据点(让每个客户端在提交之前对其进行扫描会使我的速度太慢)。我认为这需要在服务器端完成,但是就像我提到的那样,数据非常大,我不知道如何有效地确保没有重复。

我可能会问不可能的问题,但我想我会试一试。任何想法将不胜感激。

【问题讨论】:

    标签: queue rabbitmq


    【解决方案1】:

    我认为即使您可以解决不向队列发送重复的问题,您迟早会遇到这个问题:

    来自 RabbitMQ 文档:“从故障中恢复:如果由于客户端连接到的节点故障而导致客户端与代理断开连接,如果客户端是发布客户端,代理可以已经接受并传递了来自客户端的消息,而客户端没有收到对它们的确认;同样在消费端,客户端可能已经对消息发出确认,并且不知道这些确认是否已发送给代理,并且在故障发生之前就已经处理好了。简而言之,您仍然需要确保您的消费客户端能够识别和处理重复的消息。”

    基本上,它看起来像这样,您向 rabbitmq 发送请求,rabbitmq 回复 ACK 但出于某种原因,您的消费者或生产者没有收到此 ACK。 Rabbitmq 无法知道未收到 ack,并且您的生产者最终将重新发送消息,从未收到 ack。

    处理重复消息很痛苦,尤其是在将消息用作一种 RPC 的应用程序中,但在使用这种消息架构时,这似乎是不可避免的。

    【讨论】:

      【解决方案2】:

      核心问题似乎是这样的:

      "...its possible that a piece of data is broken down into a part that's 
      duplicated in the queue and the consumers continue to process it and 
      end up in a infinite loop."
      

      您可以随心所欲地关注排队物品的独特性,但上述问题是您应该集中精力的地方,IMO。防止无限循环的一种方法可能是在您的消息负载中添加一个“已访问”位,该位由消费者在重新排队分解的项目之前设置。

      另一种选择是让消费者重新排队回到一个特殊的队列,该队列的处理方式略有不同,以防止无限循环。无论哪种方式,您都应该通过将其作为应用程序策略的核心部分来解决问题,而不是使用消息传递系统的功能来绕过它。

      【讨论】:

      • 我正在努力做到这一点(我认为)。通过确保没有重复的过去项目,我确保不会多次处理相同的数据。我只是确定rabbitmq中的实现,有没有办法简单地发送消息ID并让rabbitmq丢弃重复项或者我需要设置一个过滤器或其他东西(如果我这样做它如何与rabbitmq一起工作)。
      • 没有办法做到这一点,AFAIK。 Rabbit 不关心您的消息内容或队列中已有的内容,因此这取决于您的应用程序。
      • 所以,如果我的消息 ID 是唯一的(我的实际数据的哈希码),我需要将它们存储在数据库或其他东西中并针对它进行查询(查看之前是否发送过 msg ID)在发送给兔子之前?我一直在考虑这一点,但它需要客户端在我的消息服务器等待时进行一些查询(我试图看看我是否可以将这项工作推送到消息服务器本身)
      • 另一个问题,与我上面的问题有关,使用rabbitmq我可以在将某些内容发送到队列时触发一个进程,以便我可以过滤它还是需要在发送之前将它发送到另一个程序进行过滤排队吗?
      • 为什么不在每条消息中添加一个始终递增的数字?在它重新排队之前,与之打交道的消费者可以读取并增加有效负载中的数字,因为它将有效负载推回队列中。然后,任何阅读消息的消费者都可以拒绝其中任何一个数字高于特定阈值(例如,10)的消息。它解决了无限循环问题,不需要跟踪唯一性,也不需要任何额外的数据库。
      【解决方案3】:

      有一个用于 rabbitmq 的插件,可以让您使用一些额外的标头来进行这种类型的控制。

      您应该启用插件并在消息上定义x-deduplication-header,使用哈希或唯一标识发送的消息的东西,因此当具有相同标头值的其他消息进入rabbitmq的交换时,它不会被路由到任何队列。

      见:https://github.com/noxdafox/rabbitmq-message-deduplication

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2013-06-09
        • 2023-04-08
        • 2021-12-23
        • 2021-07-21
        • 2013-08-09
        • 1970-01-01
        • 2020-02-03
        • 2017-01-23
        相关资源
        最近更新 更多