【问题标题】:ActiveMQ failover reconnect message rolled backActiveMQ 故障转移重连消息回滚
【发布时间】:2014-09-04 09:35:20
【问题描述】:

我正在尝试找到解决问题的方法,以确保消息仅被单个消费者完全过度处理。

队列中有大量消息,许多消费者读取消息并将其处理并写入数据库。我的消息是经过处理的,所以如果一个消费者死了,那么消息会回到队列中,让另一个消费者处理。

我们必须为 activemq 进行主动/被动配置,这导致了问题。如果我停止活动的 activemq,那么当我使用故障转移传输时,消费者会重新连接到另一个 activemq。这很好,但是在重新连接期间,消息会放回队列中,并且消费者不会意识到此重新连接并继续处理。这会导致 2 个消费者处理相同的消息。

我本来希望使用分布式事务管理器,这可能会在未来发生,但现在我需要一个不同的解决方案。

如果我不使用故障转移传输,那么我可以挂接到 JMSException 侦听器并中止使用者。不幸的是,这在使用故障转移传输时不起作用。

我想使用故障转移传输进行初始连接(发现哪些活动MQ 正在运行),然后强制故障转移不重新连接... t 重新连接...或者找个地方收听重新连接。

请注意,有时只有一台服务器使用故障转移(重新连接)时会发生这种情况。

我可以完成初始连接逻辑(寻找活动服务器),但要检查是否有其他选项

【问题讨论】:

  • 我突然想到,我可以使用发现在启动时获得有效连接

标签: spring jms activemq


【解决方案1】:

您可以使用监听器来监听 ActiveMQConnection 上的传输事件:

    connection = (ActiveMQConnection)factory.createConnection();
    connection.addTransportListener(new TransportListener() {
    public void onCommand(Object command) {
        // Do something 
    }

    public void onException(IOException error) {
        // Do something 
    }

    public void transportInterupted() {
        // Do something 
    }

    public void transportResumed() {
        // Do something 
    }
});
connection.start();

请注意,在此示例中,侦听器是直接在 Connection 上设置的;但是,您可以在 ActiveMQConnectionFactory 上设置一个实例,该实例将分配给它创建的每个 Connection 实例。

【讨论】:

  • 不幸的是,当我通过 DefaultMessageListenerContainer 从 PooledConnectionFactory 获取连接时,我没有连接句柄。
  • 查看编辑,你也可以在你的 ActiveMQConnectionFactory 上设置一个监听器。
  • PooledConnectionfactory 并没有公开它,这是 spring-boot 给你的。我可以解决这个问题,但我已经预先测试以获得有效的连接,然后监听 jms 异常。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-05-01
  • 2017-02-04
  • 2018-12-08
  • 2010-11-06
  • 2016-05-22
  • 2014-01-21
  • 1970-01-01
相关资源
最近更新 更多