【问题标题】:Content-based consumer priority in ActiveMQActiveMQ 中基于内容的消费者优先级
【发布时间】:2021-07-23 19:33:33
【问题描述】:

我正在与需要加载大量资源来处理消息内容的消费者合作。同一家族中的消息需要相同的资源。

我尝试使用消息分组来确保将同一家族中的消息提供给已加载资源的消费者。但是,我希望消费者优先处理同一家族中的消息,而不是排他性(加载资源需要时间,处理消息需要更长的时间)。例如,如果消费者 1 和 2 可用,则 ID 为 group1 的消息应始终提供给消费者 1,而 ID 为 group2 的消息应始终提供给消费者 2。如果只有消费者 2 可用,则它应该消费任何信息。这样一来,消费者 1 对消息 group1 具有优先权,但没有排他性。同样,消费者 2 对消息 group2 具有优先权,但没有排他性。

有没有办法让 AMQ 中的消息分组不给消费者排他性?还是一种根据内容动态设置消费者优先级的方法?

我正在使用 ActiveMQ 5.16.2,但如果需要,我可以切换到 Artemis。

【问题讨论】:

  • ActiveMQ 版本 5.16.2(添加到原始消息中)
  • 如果消费者 1 和 2 可用:ID 为 'group1' 的消息应始终提供给消费者 1 ; ID 为“group2”的消息应始终提供给消费者 2。如果只有消费者 2 可用,它应该使用任何消息。这样,消费者 1 对消息“group1”具有优先权,但没有排他性。同样,消费者 2 对消息“group2”具有优先权,但没有排他性。

标签: activemq


【解决方案1】:

消息分组的全部意义在于序列化消息的消费。最简单的方法是确保组中的所有消息都分派给同一个使用者。因此,消息分组不可能不赋予消费者排他性,因为这首先会破坏分组的目的。也就是说,单个消费者可以同时接收来自多个组的消息。当群组多于消费者时,就会发生这种情况。

还值得注意的是,您无法控制哪个消费者消费哪个组。分组的重点只是 一个 消费者从组中获取消息,而不是 哪个 一个。

消费者优先级由消费者自己设置,如the documentation 所述。它不是基于正在发送的消息的内容。

您可以尝试在消息上设置一个属性来反映他们所在的“家庭”,然后您的消费者可以使用选择器来使用他们关心的消息。

请记住,您的客户端应用程序中始终可以有多个消费者。每个客户端可能有两个消费者 - 一个具有选择器的高优先级消费者,然后是一个没有选择器的低优先级消费者。代理将优先使用选择器的消费者,但如果没有匹配,它将向没有选择器的消费者发送消息。

【讨论】:

  • 是的,我在浏览文档时也想到了很多。我考虑过使用选择器,但它让我的消费者忽略了选择器过滤器之外的所有内容。理想情况下,我可以使用选择器来获取消费者喜欢的消息。如果不存在,它将接受任何其他消息。但我不认为选择器是这样工作的。看来我的用例没有得到处理,我需要想办法在 AMQ 之外做到这一点
  • 我在回答中添加了更多细节,以(希望)解决您的具体问题。我认为你可以用 ActiveMQ 做你想做的事。
  • 这 2 个答案给了我一些思考。我正在测试几种方法。我需要一些时间才能形成相关的反馈
【解决方案2】:

在这个用例中,我建议过滤消息组。我认为您正在寻求对消息流进行分区,其中消息组提供的竞争消费者 模式不能满足您的需求。如果您要求这些流由特定的消费者处理,那么您最终希望让该消费者拥有一个 HA 备用或并行消费者 - 这将是划分或过滤流量的额外驱动程序。

有两种方法 - 您可以使用虚拟目标(通过具有过滤功能的 CompositeQueue 等)进行服务器驱动的配置,或者使用虚拟主题和选择器进行客户端驱动的配置。

这是客户驱动的方法。

  1. 发布到虚拟主题(即 topic://VT.ORDER.EVENT)
  2. 从虚拟消费者队列中消费(即 queue://VQ.APP1.VT.ORDER.EVENT,queue://VQ.APP2.VT.ORDER.EVENT)
  3. 让消费者与选择器连接

每组消息都会到达自己的队列,每个队列可以有 1 .. n 个消费者。

奖励:您仍然可以让消费者通过注册通配符选择器或直接订阅主题来监控完整的流量。

<destinationInterceptors> 
  <virtualDestinationInterceptor> 
    <virtualDestinations> 
      <virtualTopic name="VT.>" prefix="VQ.*." selectorAware="true"/>   
    </virtualDestinations>
  </virtualDestinationInterceptor> 
</destinationInterceptors>

持久化选择器:

<plugins>
  <virtualSelectorCacheBrokerPlugin persistFile="<some path>${activemq.data}/virtual-topic-selectorcache.data" />
</plugins>

参考:Virtual Topics

【讨论】:

  • 我的解决方案基于此。事实上,竞争的消费者模式并没有满足我的需要。我最终创建了一个 DISPATCH 主题,服务器在该主题中通信它们已加载的资源。使用该信息,调度程序将每条消息分配给队列 SERVER1 / SERVER2,其中 server1 和 2 有消费者。调度程序在那些队列上也有较低优先级的消费者。如果调度程序从这些队列中消费一条消息,这意味着没有可用的消费者,它可以重新分配消息。如果服务器因任何原因脱机,这会有所帮助。这种方法的性能影响似乎可以接受
猜你喜欢
  • 2020-07-18
  • 2014-12-08
  • 1970-01-01
  • 2019-03-08
  • 1970-01-01
  • 2012-02-09
  • 2020-04-20
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多