【问题标题】:How do I make Spring JMSListener burst to max concurrent threads?如何使 Spring JMSListener 爆发到最大并发线程?
【发布时间】:2016-05-31 06:28:57
【问题描述】:

我有一个使用 ActiveMQ 5.10 版的 Spring JMS 应用程序。我正在执行一个简单的并发测试。我正在使用 Spring Boot、当前版本和注解来配置 JMSListener 和消息生产者。

消息生产者只是尽可能快地在队列中抛出消息。消息侦听器将消息从队列中拉出,但在收到消息后休眠 1 秒钟——模拟消息侦听器在收到消息后需要做的一些工作。

我将 JMSListener 设置为 100-1000 个并发线程。如果我同时启动消息生产者和消费者(都在自己的 JVM 中运行),消费者永远不会超过配置的最小线程数,即使最大范围设置为 1000。

如果我让生产者先启动并在队列中放置几千条消息,然后启动 1 个或更多消费者实例,它将稳定地增加线程,从每秒 100 个开始,然后每秒 20 个左右的线程,直到它得到到队列中大约有 20-30 条消息在进行中的状态。它永远不会捕获生产者——即使消费者没有接近它的 maxConcurrency 计数,队列中总会有一些消息。

为什么消息消费者不爆发成一堆额外的线程来清空队列,而不是让队列中有 20-30 条消息?消费者没有办法继续更快地添加线程以赶上队列中的消息吗?

以下是代码的相关部分。

消息制作者

@Component
public class ClientServiceImpl implements ClientService {

    private static final String QUEUE="message.test.queue";

    @Autowired
    private JmsTemplate jmsTemplate;

    @Override
    public void submitMessage(ImportantMessage importantMessage) {

        System.out.println("*** Sending " + importantMessage);
        jmsTemplate.convertAndSend(QUEUE, importantMessage);

    }
}

消息消费者

@SpringBootApplication
@EnableJms
public class AmqConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(AmqConsumerApplication.class, args);
    }
    @Value("${JMSHost}")
    private String JMS_BROKER_URL;

    @Autowired
    static Command command;

    @Bean
    public ConnectionFactory connectionFactory() {
        ConnectionFactory factory= new ActiveMQConnectionFactory(JMS_BROKER_URL);
        ((ActiveMQConnectionFactory)factory).setTrustAllPackages(true);
        ((ActiveMQConnectionFactory)factory).setOptimizeAcknowledge(true);
        ((ActiveMQConnectionFactory)factory).setAlwaysSessionAsync(false);
        return factory;
    }

}

这样配置的监听器...

@Component
public class TransformationListener {

    private static final String QUEUE="message.test.queue?consumer.prefetchSize=10";

    @JmsListener(destination=QUEUE, concurrency = "100-1000")
    public void handleRequest(ImportantMessage importantMessage) {
        System.out.println("*** Recieved message: " + importantMessage + " on thread" + Thread.currentThread().getId());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

【问题讨论】:

    标签: java spring multithreading concurrency activemq


    【解决方案1】:

    你还在面对这种行为吗? 您是否在http://activemq.apache.org/what-is-the-prefetch-limit-for.html 上阅读过“Pooled Consumers and prefetch”这个建议? 您是否尝试过 prefetchSize=0 或 1 ?我认为1可以解决您的问题。 如果 prefetchSize > 1,您可能需要将 AbortSlowAckConsumerStrategy 降低到低于默认值 30 秒。 在您的情况下,要让超过 100 个线程使用消息,您需要超过 1000 条未使用且未在队列中预取的消息,因为 prefetchSize 为 10。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-04-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多