【问题标题】:How to selectively delete messages from an AMQP (RabbitMQ) queue?如何有选择地从 AMQP(RabbitMQ)队列中删除消息?
【发布时间】:2011-03-26 23:21:39
【问题描述】:

我想有选择地从 AMQP 队列中删除消息,甚至不读取它们。

场景如下:

发送方希望基于 X 类型的新信息到达的事实使 X 类型的消息过期。因为订阅者很可能还没有消费 X 类型的最新消息,所以发布者应该删除以前的 X 类型消息并将最新的消息放入队列中。整个操作对订阅者应该是透明的——事实上他应该使用像 STOMP 这样简单的东西来获取消息。

如何使用 AMQP 做到这一点?或者也许在另一个消息传递协议中更方便?

我想避免复杂的基础架构。所需的整个消息传递就像上面一样简单:一个队列、一个订阅者、一个发布者,但发布者必须能够根据给定条件临时删除消息。

发布者客户端将使用 Ruby,但实际上,只要我发现协议中的方法,我就会处理任何语言。

【问题讨论】:

    标签: message-queue messaging rabbitmq amqp


    【解决方案1】:

    如果您只想从队列中删除前 n 条消息,它似乎也可以在 RabbitMQ Web-UI 中工作

    • 从“队列”选项卡中选择队列,向下滚动到“获取消息”部分
    • 设置参数“Requeue=No”和要从队列中删除的消息数
    • 按“获取消息”按钮

    【讨论】:

      【解决方案2】:

      这个问题由于它的标题而具有很高的知名度。通过描述更具体的场景。 因此,对于那些希望从队列中实际删除下一条(记住 FIFO)消息的用户,您可以使用 rabbitmqadmin 并发出以下命令:

      rabbitmqadmin get queue=queuename requeue=false count=1

      这个命令本质上是在消费消息并且什么都不做。带有标志以备份消息的完整命令可能如下所示。确保根据您的要求添加任何其他参数。

      sudo python rabbitmqadmin -V virtualhostname -u user -p pass get queue=queuename requeue=false count=1 payload_file=~/origmsg

      【讨论】:

        【解决方案3】:

        您不需要消息队列,您需要键值数据库。例如,您可以使用 Redis 或 Tokyo Tyrant 来获得一个简单的网络可访问的键值数据库。或者只使用内存缓存。

        每种消息类型都是一个键。当您使用相同的键编写新消息时,它会覆盖以前的值,因此该数据库的读取器将永远无法获取过时的信息。

        此时,您只需要一个消息队列来确定读取密钥的顺序,如果这很重要的话。否则,只需不断地扫描数据库。如果您确实不断地扫描数据库,最好将数据库放在阅读器附近以减少网络流量。

        我可能会做这样的事情 key: typecode value: lastUpdated, important data

        然后我会发送包含 typecode, lastUpdated 这样,读者可以将该键的 lastupdated 与他们上次从数据库中读取的键进行比较,并跳过读取它,因为它们已经是最新的。

        如果您确实需要使用 AMQP 执行此操作,请使用 RabbitMQ 和自定义交换类型,特别是 Last Value Cache Exchange。示例代码在这里https://github.com/squaremo/rabbitmq-lvc-plugin

        【讨论】:

          【解决方案4】:

          您目前无法在 RabbitMQ(或更一般地说,在 AMQP)中自动执行此操作。但是,这里有一个简单的解决方法。

          假设您要发送三种类型的消息:Xs、Ys 和 Zs。如果我正确理解您的问题,当 X 消息到达时,您希望代理忘记所有其他尚未传递的 X 消息。

          这在 RabbitMQ 中相当容易做到:

          • 生产者声明了三个队列:X、Y 和 Z(它们会自动绑定到默认交换器,并将它们的名称作为路由键,这正是我们想要的),
          • 发布消息时,生产者首先清除相关队列(因此​​,如果它正在发布 X 消息,它首先清除 X 队列);这有效地删除了过时的消息,
          • 消费者只需从它想要的队列中消费(X 表示 X 条消息,Y 表示 Y 条消息等);从它的角度来看,它只需要执行 basic.get 即可获得下一条相关消息。

          当两个生产者几乎同时发送相同类型的消息时,这意味着竞争条件。结果是一个队列可能同时有两个(或更多)消息,但是由于消息的数量是生产者数量的上限,并且因为多余的消息在下一次发布时被清除,这应该不是什么大问题。

          总而言之,这个解决方案比最优解决方案多了一个步骤,即在发布 X 类型的消息之前清除队列 X。

          如果您在设置此配置时需要任何帮助,寻求建议的最佳地点是 rabbitmq-discuss 邮件列表。

          【讨论】:

          • 在我的情况下,使用队列的唯一原因是 X、Y、Z 事件是交错的,订阅者应该按照它们来的顺序读取它们——但只有最新的 X、最新的 Y 和最新的 Z。另外,类型的数量是数千个,所以订阅者不会监听数千个队列。
          • 啊。收听数千个队列不会有问题(几乎没有开销)。一旦消息在队列中,您就无法真正选择性地删除消息。我突然想到,您想要的是一个支持原子移动操作的文件服务器:发布者开始写入一个临时文件,完成后,将该文件移动(重命名)到 X;然后客户端只读取文件 X。
          猜你喜欢
          • 1970-01-01
          • 2020-01-10
          • 1970-01-01
          • 1970-01-01
          • 2015-12-20
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多