【问题标题】:ActiveMQ InactivityMonitor is not closing after elapsing maxInactivityDuration setting经过 maxInactivityDuration 设置后 ActiveMQ InactivityMonitor 未关闭
【发布时间】:2018-01-28 23:30:24
【问题描述】:

我正在我的应用程序中实现 ActiveMQ。我有一个用例,当远程侦听队列中没有消息时,我需要关闭 activeMQ 连接,并且在有消息时必须重新打开连接。我知道仅设置 maxInactivityDuration=30000 不会帮助我实现此用例,但至少在经过 30 秒超时设置后它不会关闭连接。请让我知道这方面出了什么问题,并让我知道如何实现我的用例。谢谢你的解决方案。

@Configuration
@EnableJms
@ImportResource("classpath*:beans.xml")
public class MessagingConfiguration {   


    @Value("${activemq.broker.url}")
    private String BROKER_URL = "tcp://localhost:61616?wireFormat.maxInactivityDuration=30000";

    @Value("${activemq.request.queue}")
    private String REQUEST_QUEUE = "test.request";

    @Value("${activemq.response.queue}")
    private String RESPONSE_QUEUE = "test.response";

    @Value("${activemq.borker.username}")
    private String BROKER_USERNAME = "admin";

    @Value("${activemq.borker.password}")
    private String BROKER_PASSWORD = "admin"; 

    @Autowired
    ListenerClass messageListener;

    @Autowired
    JmsExceptionListener jmsExceptionListener;

    @Bean
    public ActiveMQConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(BROKER_URL);
        connectionFactory.setUserName(BROKER_USERNAME);
        connectionFactory.setPassword(BROKER_PASSWORD);
        connectionFactory.setTrustAllPackages(true);
        connectionFactory.setRedeliveryPolicy(redeliveryPolicy());
        return connectionFactory;
    }

    @Bean
    public RedeliveryPolicy redeliveryPolicy() {
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setBackOffMultiplier(3); // Wait 5 seconds first re-delivery, then 15, 45 seconds
        redeliveryPolicy.setInitialRedeliveryDelay(5000);
        redeliveryPolicy.setMaximumRedeliveries(3);
        redeliveryPolicy.setUseExponentialBackOff(true);
        return redeliveryPolicy;
    }

    @Bean
    public ConnectionFactory cachingConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setTargetConnectionFactory(connectionFactory());
        connectionFactory.setExceptionListener(jmsExceptionListener);
        connectionFactory.setSessionCacheSize(100);
        connectionFactory.setCacheConsumers(false);
        connectionFactory.setCacheProducers(false);
        return connectionFactory;
    }

    @Bean
    public JmsTemplate jmsTemplate() {
        JmsTemplate template = new JmsTemplate();
        template.setConnectionFactory(connectionFactory());
        template.setDefaultDestinationName(REQUEST_QUEUE);
        template.setSessionAcknowledgeModeName("CLIENT_ACKNOWLEDGE");
        template.setMessageConverter(converter());
        return template;
    }

    @Bean
    public DefaultMessageListenerContainer jmsListenerContainer() {
        DefaultMessageListenerContainer factory = new DefaultMessageListenerContainer();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrency("1-1");
        factory.setDestinationName(RESPONSE_QUEUE);
        factory.setMessageListener(messageListener);
        factory.setExceptionListener(jmsExceptionListener);
        return factory;
    }

    @Bean
    MessageConverter converter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }

}

控制台日志:

Aug 20, 2017 5:24:47 PM org.apache.catalina.startup.Catalina start
INFO: Server startup in 61255 ms
17:24:52.216 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
17:24:52.216 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
17:24:53.145 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10000ms elapsed since last write check.
17:24:53.145 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
17:24:56.134 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
17:24:56.134 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
17:24:56.336 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10000ms elapsed since last write check.
17:24:56.336 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
17:25:00.252 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10000ms elapsed since last write check.
17:25:00.257 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
17:25:02.217 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
17:25:02.217 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
17:25:03.145 [ActiveMQ InactivityMonitor ReadCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - 30001ms elapsed since last read check.
17:25:03.146 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
17:25:03.146 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
17:25:03.262 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10000ms elapsed since last write check.
17:25:03.262 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
17:25:06.133 [ActiveMQ InactivityMonitor ReadCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - 30001ms elapsed since last read check.
17:25:06.135 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
17:25:06.135 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
17:25:06.337 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
17:25:06.337 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
17:25:10.253 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
17:25:10.253 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]

【问题讨论】:

  • 代理传输的 maxInactivityDuration 设置为多少?如果它为零,您将不走运,因为代理将采用客户端和代理设置中的最小值,而零表示“禁用非活动计时器”。至于实质性问题——当您说您希望在没有消息的情况下关闭连接时,您的意思是希望 broker 关闭它,还是希望 client 关闭它?
  • 当前设置为 30 秒 (tcp://localhost:61616?wireFormat.maxInactivityDuration=30000)。我希望代理关闭连接并在侦听器队列中有消息时重新打开。
  • maxInactivityDuration 可以在连接的代理和客户端上设置。将采用最低值。如果其中一个值为零,则它将始终是最低的,并且禁用不活动监视。我不知道您的代理是如何设置的,所以我不知道在传输连接器上设置 maxInactivityDuration 的可能性有多大。
  • 与代理的连接必须由客户端发起。如果代理在没有可用消息时关闭了客户端的连接,那只会让客户端陷入无连接的状态。也许您需要重新工作客户端,以便它使用超时接收?这样,客户端就会知道消息何时可用,并可以相应地管理其连接?

标签: java activemq message-queue spring-jms


【解决方案1】:

看起来根据您的代码,您已经为连接注册了一个侦听器,因此它不会处于空闲状态,因此不会被关闭。

但是如果你想强制关闭连接,那么你可能需要为你的连接设置 maxTimeout。

【讨论】:

  • 你能告诉我确切的设置吗,因为我找不到这个设置。我尝试了故障转移传输 (timeout=3000) 设置,但没有成功。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2018-01-26
  • 2019-03-27
  • 2020-11-30
  • 1970-01-01
  • 2019-03-17
  • 1970-01-01
  • 2012-10-23
相关资源
最近更新 更多