【问题标题】:ActiveMQ with JMS topic - some messages not dequeued by consumer带有 JMS 主题的 ActiveMQ - 一些消息没有被消费者出列
【发布时间】:2014-01-12 10:14:07
【问题描述】:

我们正在尝试将 ActiveMQ 5.9.0 设置为使用 JMS 主题的消息代理,但我们在使用消息时遇到了一些问题。

出于测试目的,我们有 1 个主题、1 个事件生产者和 1 个消费者的简单配置。我们一个接一个地发送 10 条消息,但是每次我们运行应用程序时,其中 1-3 条消息没有被消费!其他消息被消费并处理得很好。 我们可以在 ActiveMQ 管理控制台中看到我们发布到主题的所有消息,但它们永远不会到达消费者,即使我们重新启动应用程序(我们可以看到“入队”和“出队”列中的数字不同)。

编辑:我还应该提到,当使用队列而不是主题时,不会出现这个问题。

为什么会这样?它可能与atomikos(即事务管理器)有关吗?或者可能是配置中的其他内容?欢迎任何想法/建议。 :)

这是 ActiveMQ/JMS spring 配置:

    <bean id="connectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
    init-method="init" destroy-method="close">
    <property name="uniqueResourceName" value="amq" />
    <property name="xaConnectionFactory">
        <bean class="org.apache.activemq.spring.ActiveMQXAConnectionFactory"
            p:brokerURL="${activemq_url}" />
    </property>
    <property name="maxPoolSize" value="10" />
    <property name="localTransactionMode" value="false" />
</bean>

<bean id="cachedConnectionFactory"
    class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="connectionFactory" />
</bean>

<!-- A JmsTemplate instance that uses the cached connection and destination -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="cachedConnectionFactory" />
    <property name="sessionTransacted" value="true" />
    <property name="pubSubDomain" value="true"/>
</bean>

<bean id="testTopic" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg value="test.topic" />
</bean>

<!-- The Spring message listener container configuration -->
<jms:listener-container destination-type="topic"
    connection-factory="connectionFactory" transaction-manager="transactionManager"
    acknowledge="transacted" concurrency="1">
    <jms:listener destination="test.topic" ref="testReceiver"
        method="receive" />
</jms:listener-container>

制作人:

@Component("producer")
public class EventProducer {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Transactional
    public void produceEvent(String message) {
        this.jmsTemplate.convertAndSend("test.topic", message);
    }
}

消费者:

@Component("testReceiver")
public class EventListener {

    @Transactional
    public void receive(String message) {
        System.out.println(message);
    }  
}

测试:

    @Autowired
    private EventProducer eventProducer;

    public void testMessages() {

    for (int i = 1; i <= 10; i++) {
        this.eventProducer.produceEvent("message" + i);
    }

【问题讨论】:

    标签: activemq spring-jms atomikos jms-topic


    【解决方案1】:

    这就是 JMS 主题的本质——默认情况下只有当前订阅者会收到消息。你有一个竞争条件,并且在消费者建立订阅之前发送消息,在容器启动之后。对于您在同一应用程序中发送和接收主题的单元/集成测试,这是一个常见错误。

    对于较新版本的 Spring,有一个 method you can poll to wait until the subscriber is established(我认为是从 3.1 开始)。或者,您可以稍等片刻再开始发送,或者您可以使您的订阅持久。

    【讨论】:

    • 嗨,Gary,感谢您的回答,但我不确定我是否理解。首先,我们没有使用 JUnit,这只是我们创建的一个简单场景,用于在添加更多主题、侦听器等之前检查配置是否有效。此外,丢失的消息不一定是我们发送的第一个消息(所以有些消息在丢失的之前已经收到)。我们尝试在开始发送之前添加 Thread.sleep(2000),但它并没有解决问题,而且我们也不会在实际场景中这样做......我会查看您提供的链接,看看是否它有助于。 :)
    • 另外,你能举个例子来说明如何在 spring xmls 中配置持久订阅吗?谢谢!:)
    • 我更改了答案以删除对 JUnit 的引用;我的意思是一般的测试,你在同一个应用程序中发送和接收。确保上下文完全刷新并且消费者在发送消息之前已经启动。查看 TRACE 级别的日志以观察消费者开始消费。持久订阅:在此处查看 clientId、destinationType 和订阅:docs.spring.io/spring-framework/docs/current/… 请记住,在第一次订阅完成之前订阅不会持久(因此您的第一次尝试可能会失败)。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-08-22
    • 2020-12-09
    • 2013-08-13
    • 2014-05-21
    • 2018-03-08
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多