【发布时间】:2022-11-07 20:31:43
【问题描述】:
我通过 Spring 的DefaultJmsListenerContainerFactory 连接到 ActiveMQ Artemis 集群(实际上是来自 Red Hat 的 AMQ)时遇到问题。
DefaultMessageListenerContainer 仅使用一个连接,无论您通过 concurrency 参数指定的消费者数量如何。问题是,在集群中,目前配置了 3 个代理(作为开发人员,我不应该关心集群的拓扑结构)。由于这里只有一个连接,消费者只听一个代理。
为了解决这个问题,我禁用了缓存(即工厂中的setCacheLevel(CACHE_NONE))。
它“解决”了问题,因为现在我可以看到连接分布在集群的所有节点上,但这不是一个好的解决方案,因为连接会被永久删除和重新创建,这会在代理端产生很多开销(这让我觉得一棵圣诞树:D)。
你们能告诉我处理这个问题的正确方法是什么吗?
我尝试使用JmsPoolConnectionFactory,但直到现在我还没有得到任何好的结果。我仍然只有一个连接。
我正在使用带有 Artemis Starter 的 Spring Boot 2.7.4。 您可以在下面找到实际配置的代码 sn-p。
(旁注,我不使用 Spring 自动配置,因为我需要能够在 ActiveMQ Artemis 和旧的 ActiveMQ“经典”实现之间切换)。
@Bean
DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setDestinationResolver(destinationResolver());
factory.setSessionTransacted(true);
factory.setConcurrency(config.getConcurrency());
//Set this to allow load balancing of connections to all members of the cluster
factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_NONE);
final ExponentialBackOff backOff = new ExponentialBackOff(
config.getRetry().getInitialInterval(), config.getRetry().getMultiplier());
backOff.setMaxInterval(config.getRetry().getMaxDuration());
factory.setBackOff(backOff);
return factory;
}
ConnectionFactory connectionFactory() {
return new ActiveMQJMSConnectionFactory(
config.getUrl(), config.getUser(), config.getPassword());
}
DestinationResolver destinationResolver() {
final ActiveMQQueue activeMQQueue = new ActiveMQQueue(config.getQueue());
return (session, destinationName, pubSubDomain) -> activeMQQueue;
}
@JmsListener(destination = "${slp.amq.queue}")
public void processLog(String log) {
final SecurityLog securityLog = SecurityLog.parse(log);
fileWriter.write(securityLog);
logsCountByApplicationId.increment(securityLog.getApplicationId());
if (elasticClient != null) {
elasticClient.write(securityLog);
}
}
连接网址为:
(tcp://broker1:port,tcp://broker2:port,tcp://broker3:port)?useTopologyForLoadBalancing=true
【问题讨论】:
标签: java spring-jms activemq-artemis