【发布时间】:2018-11-28 17:25:08
【问题描述】:
我有 2 个 Spring RabbitMq 配置,一个使用 RabbitTemplate,一个使用 JmsTemplate。
RabbitTemplate 的配置:
AmqpMailIntegrationPerfTestConfig 类:
@Configuration
@ComponentScan(basePackages = {
"com.test.perf.amqp.receiver",
"com.test.perf.amqp.sender"
})
@EnableRabbit
public class AmqpMailIntegrationPerfTestConfig {
@Bean
public DefaultClassMapper classMapper() {
DefaultClassMapper classMapper = new DefaultClassMapper();
Map<String, Class<?>> idClassMapping = new HashMap<>();
idClassMapping.put("mail", MailMessage.class);
classMapper.setIdClassMapping(idClassMapping);
return classMapper;
}
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
jsonConverter.setClassMapper(classMapper());
return jsonConverter;
}
@Bean
public RabbitTemplate myRabbitTemplate(ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
@Bean
public ConnectionFactory createConnectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
return connectionFactory;
}
@Bean
Queue queue() {
return new Queue(AmqpMailSenderImpl.QUEUE_NAME, false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(AmqpMailSenderImpl.TOPIC_EXCHANGE_NAME);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(AmqpMailSenderImpl.ROUTING_KEY);
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(createConnectionFactory());
}
@Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(createConnectionFactory());
factory.setMaxConcurrentConsumers(5);
factory.setMessageConverter(jsonMessageConverter());
return factory;
}
}
com.test.perf.amqp.sender 包中的 AmqpMailSenderPerfImpl 类:
@Component
public class AmqpMailSenderPerfImpl implements MailSender {
public static final String TOPIC_EXCHANGE_NAME = "mails-exchange";
public static final String ROUTING_KEY = "mails";
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public boolean sendMail(MailMessage message) {
rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY, message);
return true;
}
}
com.test.perf.amqp.receiver 包中的 AmqpMailReceiverPerfImpl 类:
@Component
public class AmqpMailReceiverPerfImpl implements ReceivedDatesKeeper {
private Logger logger = LoggerFactory.getLogger(getClass());
private Map<String,Date> datesReceived = new HashMap<String, Date>();
@RabbitListener(containerFactory = "myRabbitListenerContainerFactory", queues = AmqpMailSenderImpl.QUEUE_NAME)
public void receiveMessage(MailMessage message) {
logger.info("------ Received mail! ------\nmessage:" + message.getSubject());
datesReceived.put(message.getSubject(), new Date());
}
public Map<String, Date> getDatesReceived() {
return datesReceived;
}
}
JmsTemplate 的配置:
JmsMailIntegrationPerfTestConfig 类:
@Configuration
@EnableJms
@ComponentScan(basePackages = {
"com.test.perf.jms.receiver",
"com.test.jms.sender"
})
public class JmsMailIntegrationPerfTestConfig {
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
Map<String,Class<?>> typeIdMappings = new HashMap<String,Class<?>>();
typeIdMappings.put("mail", MailMessage.class);
converter.setTypeIdMappings(typeIdMappings);
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
@Bean
public ConnectionFactory createConnectionFactory(){
RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
return connectionFactory;
}
@Bean(name = "myJmsFactory")
public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrency("10-50");
factory.setMessageConverter(jacksonJmsMessageConverter());
return factory;
}
@Bean
public Destination jmsDestination() {
RMQDestination jmsDestination = new RMQDestination();
jmsDestination.setDestinationName("myQueue");
jmsDestination.setAmqp(false);
jmsDestination.setAmqpQueueName("mails");
return jmsDestination;
}
@Bean
public JmsTemplate myJmsTemplate(ConnectionFactory connectionFactory) {
final JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
jmsTemplate.setMessageConverter(jacksonJmsMessageConverter());
return jmsTemplate;
}
}
com.test.jms.sender 包中的 JmsMailSenderImpl 类:
@Component
public class JmsMailSenderImpl implements MailSender {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private JmsTemplate jmsTemplate;
@Override
public boolean sendMail(MailMessage message) {
logger.info("Sending message!");
jmsTemplate.convertAndSend("mailbox", message);
return false;
}
}
com.test.perf.jms.receiver 包中的 JmsMailReceiverPerfImpl 类:
@Component
public class JmsMailReceiverPerfImpl implements ReceivedDatesKeeper {
private Logger logger = LoggerFactory.getLogger(getClass());
private Map<String,Date> datesReceived = new HashMap<String, Date>();
@JmsListener(destination = "mailbox", containerFactory = "myJmsFactory", concurrency = "10")
public void receiveMail(MailMessage message) {
datesReceived.put(message.getSubject(), new Date());
logger.info("Received <" + message.getSubject() + ">");
}
public Map<String, Date> getDatesReceived() {
return datesReceived;
}
}
我通过启动 10 个线程并让各个 MailSender 分别发送 1000 封邮件来测试上述配置。
对于带有 RabbitTemplate 的配置,我得到: * 所有消息的总吞吐时间:3687ms * 处理一条消息的时间:817ms
对于带有 JmsTemplate 的配置,我得到: * 所有消息的总吞吐时间:41653ms * 处理一条消息的时间:67ms
这似乎表明带有 JmsTemplate 的版本没有并行工作,或者至少没有最佳地使用资源。
有人知道是什么原因造成的吗?我尝试了不同的事务和并发参数,但无济于事。
我们想要使用 JmsTemplate 获得与使用 RabbitTemplate 相同的吞吐量时间,因此我们可以将 JMS 用作抽象层。
【问题讨论】:
-
我不是在争论这个问题,但你的数学有问题 - 总时间超过 10 倍,但每条消息的时间少于 1/10。
-
数字是正确的。使用 jms 侦听器处理消息的时间确实比使用 rabbit 侦听器要短很多。这似乎违反直觉,但也许它只是意味着在第二种情况下实际上没有以某种方式并发执行?
-
有趣;那么“处理一条消息的时间”是什么意思?我假设它是
was total time / # messages,在这种情况下,您将只使用 Rabbit 处理 4 条消息,使用 JMS 处理 621 条消息。你什么时候开始/停止每条消息的时钟? Spring 框架基础结构(在 JMS/RabbitMQ 客户端之上)对于 RabbitListener 和 JmsListener 几乎相同,所以我希望得到类似的结果。
标签: java rabbitmq spring-jms spring-rabbit