【发布时间】:2018-01-28 23:30:24
【问题描述】:
我正在我的应用程序中实现 ActiveMQ。我有一个用例,当远程侦听队列中没有消息时,我需要关闭 activeMQ 连接,并且在有消息时必须重新打开连接。我知道仅设置 maxInactivityDuration=30000 不会帮助我实现此用例,但至少在经过 30 秒超时设置后它不会关闭连接。请让我知道这方面出了什么问题,并让我知道如何实现我的用例。谢谢你的解决方案。
@Configuration
@EnableJms
@ImportResource("classpath*:beans.xml")
public class MessagingConfiguration {
@Value("${activemq.broker.url}")
private String BROKER_URL = "tcp://localhost:61616?wireFormat.maxInactivityDuration=30000";
@Value("${activemq.request.queue}")
private String REQUEST_QUEUE = "test.request";
@Value("${activemq.response.queue}")
private String RESPONSE_QUEUE = "test.response";
@Value("${activemq.borker.username}")
private String BROKER_USERNAME = "admin";
@Value("${activemq.borker.password}")
private String BROKER_PASSWORD = "admin";
@Autowired
ListenerClass messageListener;
@Autowired
JmsExceptionListener jmsExceptionListener;
@Bean
public ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(BROKER_URL);
connectionFactory.setUserName(BROKER_USERNAME);
connectionFactory.setPassword(BROKER_PASSWORD);
connectionFactory.setTrustAllPackages(true);
connectionFactory.setRedeliveryPolicy(redeliveryPolicy());
return connectionFactory;
}
@Bean
public RedeliveryPolicy redeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setBackOffMultiplier(3); // Wait 5 seconds first re-delivery, then 15, 45 seconds
redeliveryPolicy.setInitialRedeliveryDelay(5000);
redeliveryPolicy.setMaximumRedeliveries(3);
redeliveryPolicy.setUseExponentialBackOff(true);
return redeliveryPolicy;
}
@Bean
public ConnectionFactory cachingConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setTargetConnectionFactory(connectionFactory());
connectionFactory.setExceptionListener(jmsExceptionListener);
connectionFactory.setSessionCacheSize(100);
connectionFactory.setCacheConsumers(false);
connectionFactory.setCacheProducers(false);
return connectionFactory;
}
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(connectionFactory());
template.setDefaultDestinationName(REQUEST_QUEUE);
template.setSessionAcknowledgeModeName("CLIENT_ACKNOWLEDGE");
template.setMessageConverter(converter());
return template;
}
@Bean
public DefaultMessageListenerContainer jmsListenerContainer() {
DefaultMessageListenerContainer factory = new DefaultMessageListenerContainer();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrency("1-1");
factory.setDestinationName(RESPONSE_QUEUE);
factory.setMessageListener(messageListener);
factory.setExceptionListener(jmsExceptionListener);
return factory;
}
@Bean
MessageConverter converter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
}
控制台日志:
Aug 20, 2017 5:24:47 PM org.apache.catalina.startup.Catalina start
INFO: Server startup in 61255 ms
17:24:52.216 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
17:24:52.216 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
17:24:53.145 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10000ms elapsed since last write check.
17:24:53.145 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
17:24:56.134 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
17:24:56.134 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
17:24:56.336 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10000ms elapsed since last write check.
17:24:56.336 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
17:25:00.252 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10000ms elapsed since last write check.
17:25:00.257 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
17:25:02.217 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
17:25:02.217 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
17:25:03.145 [ActiveMQ InactivityMonitor ReadCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - 30001ms elapsed since last read check.
17:25:03.146 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
17:25:03.146 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
17:25:03.262 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10000ms elapsed since last write check.
17:25:03.262 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
17:25:06.133 [ActiveMQ InactivityMonitor ReadCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - 30001ms elapsed since last read check.
17:25:06.135 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
17:25:06.135 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
17:25:06.337 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
17:25:06.337 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
17:25:10.253 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
17:25:10.253 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
【问题讨论】:
-
代理传输的 maxInactivityDuration 设置为多少?如果它为零,您将不走运,因为代理将采用客户端和代理设置中的最小值,而零表示“禁用非活动计时器”。至于实质性问题——当您说您希望在没有消息的情况下关闭连接时,您的意思是希望 broker 关闭它,还是希望 client 关闭它?
-
当前设置为 30 秒 (tcp://localhost:61616?wireFormat.maxInactivityDuration=30000)。我希望代理关闭连接并在侦听器队列中有消息时重新打开。
-
maxInactivityDuration 可以在连接的代理和客户端上设置。将采用最低值。如果其中一个值为零,则它将始终是最低的,并且禁用不活动监视。我不知道您的代理是如何设置的,所以我不知道在传输连接器上设置 maxInactivityDuration 的可能性有多大。
-
与代理的连接必须由客户端发起。如果代理在没有可用消息时关闭了客户端的连接,那只会让客户端陷入无连接的状态。也许您需要重新工作客户端,以便它使用超时接收?这样,客户端就会知道消息何时可用,并可以相应地管理其连接?
标签: java activemq message-queue spring-jms