【问题标题】:Using Spring MessageListener to consume messages in groups?使用 Spring MessageListener 分组消费消息?
【发布时间】:2013-07-13 02:55:35
【问题描述】:

我有一个 spring 应用程序,我想使用 JMS Message Groups 来处理特定块中的 JMS 消息(以及相同的事务等)。基本上说我有 5 个相关事件,我有一个 JMSTemplate 以相同的 JMSXGroupID 和连续的 JMSXGroupSeq 值发送它们。

然后我在 Spring 中定义了一个 MessageProcessorService,看起来像这样:

<bean id="messageProcessorService" class="x.y.z.MessageProcessorService"/>
<jms:listener-container connection-factory="pooledJmsConnectionFactory" concurrency="5" >
    <jms:listener destination="messages.queue" ref="messageProcessorService" />
</jms:listener-container>

我的MessageProcessorService是标准的,简单的:

@Service
public class MessageProcessorService implements MessageListener {

public void onMessage(Message msg) { ... }
}

问题是,因为 onMessage 一次只能收到 1 条消息。如何获取特定组中的所有 5 条消息,然后开始处理它们?

我知道我可以使用负的JMSXGroupSeq 值来标记组的结束,然后我想我可以保留一小部分消息并检查消息JMSXGroupSeq 并且当它为-1 时处理整个组,但这似乎有点 hacky 并且不确定它是否是线程安全的(我肯定需要并行处理多个线程)。

其他人之前在 Spring/JMS/ActiveMQ 中做过类似的事情吗?

【问题讨论】:

    标签: java spring jms activemq


    【解决方案1】:

    为了回答我自己的问题,我找到了this approach 并做了大致相似的事情。但它并不漂亮。我认为从长远来看,我们可能不得不放弃 MessageListenerContainer 并使用直接 JMS api 推出我们自己的解决方案。

    【讨论】:

    【解决方案2】:

    很好的问题。虽然消息组对于某些任务非常有用,但它们并没有将这种优势映射到消息侦听器上。

    您可以采用自己的方法,在消息中保留一个列表。如果您在原型范围内创建侦听器 bean,线程安全将不会成为问题。

    <bean id="messageProcessorService" class="x.y.z.MessageProcessorService" scope="prototype"/>
    

    http://static.springsource.org/spring/docs/3.0.7.RELEASE/reference/beans.html#beans-factory-scopes-prototype

    您只需要保留一些在课堂上收到的消息的内部列表,它们对于该线程来说将是唯一的。

    这在一定程度上取决于您的交易要求。您是否需要围绕整个消息组提交事务?然后,您需要跟踪何时提交和何时不提交。 IBM 在清单 5 中做了一个示例(虽然不使用 ActiveMQ)here。它不是 Spring,而是普通的 MDB。虽然不能说它与 ActiveMQ 的工作原理相同,所以我不保证。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-06-15
      • 2020-08-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-03-29
      • 1970-01-01
      • 2016-11-11
      相关资源
      最近更新 更多