【问题标题】:Increase message consumption rate提高消息消费率
【发布时间】:2020-05-10 22:40:44
【问题描述】:

我们使用 ActiveMQ 作为我们的消息代理。

对于其中一个队列,我们​​发现生成消息的速率远高于消耗消息的速率。这有时会导致 ActiveMQ 崩溃。

因此,我们正在探索提高消耗率的选项。 (首要任务是提高现有消费者的比率,然后增加消费者 pod/实例的数量)。

以下是当前监听配置

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setConcurrency("1-1");

    factory.setMessageConverter(jacksonJmsMessageConverter());
    return factory;
}

侦听器逻辑与数据库多次交互,可以限定为 I/O 绑定任务。所以我们正在考虑并行处理多条消息。

我们找到了以下选项。

  1. 我们已将并发设置为1-1,这意味着 concurrentConsumersmaxConcurrentConsumers 为 1,一次只处理 1 条消息。这是我们可以增加为 5-10 的配置,这样最少 5 个和最多 10 个消费者将能够同时操作。

  2. 我们还发现在监听器工厂中也可以设置TaskExecutor。像设置一个threadPoolExecutor (corePoolSize = 5, maxPoolSize = 10, queueCapacity=50) 也可以帮助我们并发处理消息。

  3. 我不确定使用哪个选项(1 或 2)

  4. 如果我们将并发增加到 5-10 并设置一个 ThreadPoolExecutor,会有什么行为和影响?
  5. 在选择参数的最佳值方面有什么建议吗?

【问题讨论】:

标签: java jms activemq spring-jms


【解决方案1】:

我同意前面的答案。

当您使用 Spring DefaultMessageListenerContainer (DMLC) 的消息消费性能较差时,这几乎总是意味着每次读取的消息都会重新创建 JMS 使用者/会话/连接。

在非交易的情况下,您可以让 DMLC 缓存消费者。 DMLC 将使用单个 JMS 连接,并在该连接上启动多个 JMS 会话。每个会话都有一个 JMS 消息使用者。

在事务处理的情况下,您将在 DMLC 中没有缓存,并且必须使用缓存连接工厂以避免性能问题。查看org.apache.activemq.pool.PooledConnectionFactoryorg.messaginghub.pooled.jms.JmsPoolConnectionFactory 了解此功能。

至于你的问题,我也不会打扰TaskExecutor。我建议从5-5 开始并发。我发现让消费者的最小和最大数量相同(只是有一个固定的池)在稳定性和性能方面具有优势。

关于 ActiveMQ 崩溃,这是由于 ActiveMQ 预取。默认情况下,ActiveMQ 的预取值为 1000。JMS 消息消费者请求消息,代理在预取中传递消息和其他 999 个消息。如果消费者/会话随后关闭,则 999 条消息在客户端被丢弃,然后在代理上重新排队。这对经纪人来说是非常虐待的,并且没有得到很好的处理。

另外,请注意,例如,如果您有 5 个并发消费者,那么第一个消费者将获得 1000 条消息,然后下一个消费者将获得 1000 条消息,依此类推。因此,如果队列中只有 500 条消息,那么只有一个消费者处于活动状态。您需要在队列中有 5000 条消息才能激活所有 5 个消费者。

如有疑问,请在客户端配置中禁用消息预取:

tcp://broker_uri:61616?jms.prefetchPolicy.all=0

【讨论】:

    【解决方案2】:

    DefaultMessageListenerContainer.setTaskExecutor() 的 JavaDoc 是这样说的:

    设置 Spring TaskExecutor 以用于运行侦听器线程。

    默认为SimpleAsyncTaskExecutor,根据指定的并发消费者数量启动多个新线程。

    指定替代TaskExecutor 以与现有线程池集成。请注意,这仅在以特定方式管理线程(例如在 Java EE 环境中)时才真正增加价值。普通线程池不会增加太多价值,因为此侦听器容器将在其整个生命周期内占用多个线程。

    因此,设置任务执行器的主要用例是与现有线程池集成。默认执行器将根据您配置的并发消费者数量进行扩展。因此,我不建议您设置任务执行器。

    我建议简单地设置并发。

    您需要通过仔细的基准测试和分析来确定您的最佳值。在您开始该过程之前,我强烈建议您设定一个具体的性能目标,因为如果没有目标,基准测试和优化可能会成为无休止的任务。

    您也可以考虑使用flow control for the producers,这样一开始队列就不会那么满。

    【讨论】:

      【解决方案3】:

      您观察到的吞吐量(以 msg/s 和 MB/s 为单位)是多少?

      根据我的经验,您将观察到使用直接 JMS API 在 PooledConnectionFactory 上批处理事务的吞吐量最快。 Spring JMS 模板可能难以配置和调整,尤其是多线程和事务。它还可以在每条消息上关闭消费者和会话等对象,从而消除代理端缓存和预取的好处。

      当您直接使用 JMS-API 和批处理事务(JMS,不一定是 XA)时,您将受益于 ActiveMQ 代理的服务器端缓存和预取,这会产生巨大的差异。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2017-03-19
        • 1970-01-01
        • 2023-02-17
        • 2015-11-25
        • 2021-12-02
        • 1970-01-01
        • 2015-07-23
        相关资源
        最近更新 更多