【问题标题】:How to target all nodes of an ActiveMQ Artemis cluster with Spring's DefaultMessageListenerContainer如何使用 Spring 的 DefaultMessageListenerContainer 定位 ActiveMQ Artemis 集群的所有节点
【发布时间】:2022-11-07 20:31:43
【问题描述】:

我通过 Spring 的DefaultJmsListenerContainerFactory 连接到 ActiveMQ Artemis 集群(实际上是来自 Red Hat 的 AMQ)时遇到问题。

DefaultMessageListenerContainer 仅使用一个连接,无论您通过 concurrency 参数指定的消费者数量如何。问题是,在集群中,目前配置了 3 个代理(作为开发人员,我不应该关心集群的拓扑结构)。由于这里只有一个连接,消费者只听一个代理。

为了解决这个问题,我禁用了缓存(即工厂中的setCacheLevel(CACHE_NONE))。 它“解决”了问题,因为现在我可以看到连接分布在集群的所有节点上,但这不是一个好的解决方案,因为连接会被永久删除和重新创建,这会在代理端产生很多开销(这让我觉得一棵圣诞树:D)。

你们能告诉我处理这个问题的正确方法是什么吗? 我尝试使用JmsPoolConnectionFactory,但直到现在我还没有得到任何好的结果。我仍然只有一个连接。

我正在使用带有 Artemis Starter 的 Spring Boot 2.7.4。 您可以在下面找到实际配置的代码 sn-p。

(旁注,我不使用 Spring 自动配置,因为我需要能够在 ActiveMQ Artemis 和旧的 ActiveMQ“经典”实现之间切换)。

@Bean
DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setDestinationResolver(destinationResolver());
    factory.setSessionTransacted(true);
    factory.setConcurrency(config.getConcurrency());
    //Set this to allow load balancing of connections to all members of the cluster
    factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_NONE);

    final ExponentialBackOff backOff = new ExponentialBackOff(
    config.getRetry().getInitialInterval(), config.getRetry().getMultiplier());
    backOff.setMaxInterval(config.getRetry().getMaxDuration());

    factory.setBackOff(backOff);

    return factory;
}

ConnectionFactory connectionFactory() {
    return new ActiveMQJMSConnectionFactory(
    config.getUrl(), config.getUser(), config.getPassword());
}

DestinationResolver destinationResolver() {
    final ActiveMQQueue activeMQQueue = new ActiveMQQueue(config.getQueue());
    return (session, destinationName, pubSubDomain) -> activeMQQueue;
}


@JmsListener(destination = "${slp.amq.queue}")
public void processLog(String log) {
    final SecurityLog securityLog = SecurityLog.parse(log);

    fileWriter.write(securityLog);
    logsCountByApplicationId.increment(securityLog.getApplicationId());

    if (elasticClient != null) {
        elasticClient.write(securityLog);
    }
}

连接网址为:

(tcp://broker1:port,tcp://broker2:port,tcp://broker3:port)?useTopologyForLoadBalancing=true

【问题讨论】:

    标签: java spring-jms activemq-artemis


    【解决方案1】:

    可以配置集群,以便任何节点上的任何消费者都可以消费发送到任何节点的消息。因此,您不应该严格地使用您的消费者来“针对集群的所有节点”。集群中的消息重新分发和重新路由对您的应用程序应该是透明的。正如您所说,作为开发人员,您不应该关心集群的拓扑。

    也就是说,集群的目标是通过水平扩展来提高整体消息吞吐量(即性能)。此外,理想情况下,集群中的每个节点都应该有足够的生产者和消费者,这样消息就不会在集群节点之间重新分发或重新路由,因为这对性能来说不是最佳的。如果您处于只有少数消费者连接到集群的情况,那么您可能实际上并没有需要首先是一个集群。在某些用例中,单个 ActiveMQ Artemis 代理每秒可以处理数百万条消息。

    【讨论】:

    • 嗨,谢谢您的反馈。你是对的,因为性能原因我真的不需要集群。问题是我正在一个 HA 环境中部署,其中有 3 个故障域“包含”每个集群的一个节点。我也不是唯一使用它的应用程序。话虽如此,我应该能够直接在消息所在的节点上使用消息,即使拓扑能够在节点之间重新分配消息(这里,它是代理拓扑的网络)以避免网络开销。也就是说,我的方法可能完全错误:D
    • 如果您出于性能原因不使用集群,那么为什么需要“直接在它所在的节点上使用消息......以避免网络开销”?为此,您必须手动创建和管理与集群中每个节点的连接。客户端中没有任何内容可以为您执行此操作。如前所述,集群的设计使其可以将消息重新分发和重新路由给消费者,以避免饥饿,特别是客户端必须关心集群拓扑。我不认为你的建议是一个好方法。
    • 问题是我不是唯一使用这个集群的人。建议来自我们的基础设施团队,说我们应该在所有节点上制作/收听......我想分散所有应用程序的负载。如果某些生产者出于任何原因(故障转移......)坚持在节点 B 上而消费者只在节点 A 上收听,我可以想象这不是一个非常健康的情况,因为消息应该不断地从节点 B 传输到节点 A。
    • 如前所述,客户端中没有任何内容会自动创建到每个集群节点的连接。代理的集群功能被设计成不需要这样做。如果你想要这个功能,你必须自己实现它。最简单的解决方案可能是让您的应用程序实例与集群中的每个节点相对应。当然,您实施的任何解决方案都会增加客户端的复杂性,以便处理集群将自行处理的情况。同样,我认为这不是一个好方法。
    • 您的基础架构团队是否了解代理的集群功能被设计为可以将消息重新分发和重新路由给消费者以避免饥饿,从而使客户端不必关心集群拓扑?
    【解决方案2】:

    感谢您进行非常有趣的讨论! @Justin:您建议在客户端 URL 中配置什么,以确保只要集群中至少有一个节点处于活动状态,就始终为客户端提供服务?想象一下@Bidi 提到的集群中有3个节点(node1,node2和node3),将所有三个节点都放在代理URL中不是一个好习惯,例如“tcp://node1:port,tcp ://node2:port, tcp://node3:port”? 如果代理连接 URL 是 tcp://node1:port 并且该节点失败,客户端应用程序会发生什么情况?

    【讨论】:

      猜你喜欢
      • 2018-05-08
      • 2020-10-20
      • 1970-01-01
      • 1970-01-01
      • 2020-12-29
      • 1970-01-01
      • 2012-08-16
      • 2020-02-24
      • 2021-02-15
      相关资源
      最近更新 更多