【发布时间】:2016-05-31 06:28:57
【问题描述】:
我有一个使用 ActiveMQ 5.10 版的 Spring JMS 应用程序。我正在执行一个简单的并发测试。我正在使用 Spring Boot、当前版本和注解来配置 JMSListener 和消息生产者。
消息生产者只是尽可能快地在队列中抛出消息。消息侦听器将消息从队列中拉出,但在收到消息后休眠 1 秒钟——模拟消息侦听器在收到消息后需要做的一些工作。
我将 JMSListener 设置为 100-1000 个并发线程。如果我同时启动消息生产者和消费者(都在自己的 JVM 中运行),消费者永远不会超过配置的最小线程数,即使最大范围设置为 1000。
如果我让生产者先启动并在队列中放置几千条消息,然后启动 1 个或更多消费者实例,它将稳定地增加线程,从每秒 100 个开始,然后每秒 20 个左右的线程,直到它得到到队列中大约有 20-30 条消息在进行中的状态。它永远不会捕获生产者——即使消费者没有接近它的 maxConcurrency 计数,队列中总会有一些消息。
为什么消息消费者不爆发成一堆额外的线程来清空队列,而不是让队列中有 20-30 条消息?消费者没有办法继续更快地添加线程以赶上队列中的消息吗?
以下是代码的相关部分。
消息制作者
@Component
public class ClientServiceImpl implements ClientService {
private static final String QUEUE="message.test.queue";
@Autowired
private JmsTemplate jmsTemplate;
@Override
public void submitMessage(ImportantMessage importantMessage) {
System.out.println("*** Sending " + importantMessage);
jmsTemplate.convertAndSend(QUEUE, importantMessage);
}
}
消息消费者
@SpringBootApplication
@EnableJms
public class AmqConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(AmqConsumerApplication.class, args);
}
@Value("${JMSHost}")
private String JMS_BROKER_URL;
@Autowired
static Command command;
@Bean
public ConnectionFactory connectionFactory() {
ConnectionFactory factory= new ActiveMQConnectionFactory(JMS_BROKER_URL);
((ActiveMQConnectionFactory)factory).setTrustAllPackages(true);
((ActiveMQConnectionFactory)factory).setOptimizeAcknowledge(true);
((ActiveMQConnectionFactory)factory).setAlwaysSessionAsync(false);
return factory;
}
}
这样配置的监听器...
@Component
public class TransformationListener {
private static final String QUEUE="message.test.queue?consumer.prefetchSize=10";
@JmsListener(destination=QUEUE, concurrency = "100-1000")
public void handleRequest(ImportantMessage importantMessage) {
System.out.println("*** Recieved message: " + importantMessage + " on thread" + Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
【问题讨论】:
标签: java spring multithreading concurrency activemq