【问题标题】:Spring - plain RabbitMQ a lot faster than plain RabbitMQ + JMS?Spring - 普通 RabbitMQ 比普通 RabbitMQ + JMS 快很多?
【发布时间】:2018-11-28 17:25:08
【问题描述】:

我有 2 个 Spring RabbitMq 配置,一个使用 RabbitTemplate,一个使用 JmsTemplate。


RabbitTemplate 的配置:

AmqpMailIntegrationPerfTestConfig 类

@Configuration
@ComponentScan(basePackages = {
    "com.test.perf.amqp.receiver",
    "com.test.perf.amqp.sender"
})
@EnableRabbit
public class AmqpMailIntegrationPerfTestConfig {

    @Bean
    public DefaultClassMapper classMapper() {
        DefaultClassMapper classMapper = new DefaultClassMapper();
        Map<String, Class<?>> idClassMapping = new HashMap<>();
        idClassMapping.put("mail", MailMessage.class);
        classMapper.setIdClassMapping(idClassMapping);
        return classMapper;
    }

    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter() {
        Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
        jsonConverter.setClassMapper(classMapper());
        return jsonConverter;
    }

    @Bean
    public RabbitTemplate myRabbitTemplate(ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public ConnectionFactory createConnectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        return connectionFactory;
    }

    @Bean
    Queue queue() {
        return new Queue(AmqpMailSenderImpl.QUEUE_NAME, false);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(AmqpMailSenderImpl.TOPIC_EXCHANGE_NAME);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(AmqpMailSenderImpl.ROUTING_KEY);
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(createConnectionFactory());
    }

    @Bean
    public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(createConnectionFactory());
        factory.setMaxConcurrentConsumers(5);
        factory.setMessageConverter(jsonMessageConverter());
        return factory;
    }

}

com.test.perf.amqp.sender 包中的 AmqpMailSenderPerfImpl 类

@Component
public class AmqpMailSenderPerfImpl implements MailSender {

    public static final String TOPIC_EXCHANGE_NAME = "mails-exchange";
    public static final String ROUTING_KEY = "mails";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public boolean sendMail(MailMessage message) {
        rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY, message);
        return true;
    }
}

com.test.perf.amqp.receiver 包中的 AmqpMailReceiverPerfImpl 类

@Component
public class AmqpMailReceiverPerfImpl implements ReceivedDatesKeeper {

    private Logger logger = LoggerFactory.getLogger(getClass());

    private Map<String,Date> datesReceived = new HashMap<String, Date>();

    @RabbitListener(containerFactory = "myRabbitListenerContainerFactory", queues = AmqpMailSenderImpl.QUEUE_NAME)
    public void receiveMessage(MailMessage message) {
        logger.info("------ Received mail! ------\nmessage:" + message.getSubject());
        datesReceived.put(message.getSubject(), new Date());
    }

    public Map<String, Date> getDatesReceived() {
        return datesReceived;
    }

}

JmsTemplate 的配置:

JmsMailIntegrationPerfTestConfig 类

@Configuration
@EnableJms
@ComponentScan(basePackages = {
        "com.test.perf.jms.receiver",
        "com.test.jms.sender"
})
public class JmsMailIntegrationPerfTestConfig {

    @Bean
    public MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();

        Map<String,Class<?>> typeIdMappings = new HashMap<String,Class<?>>();
        typeIdMappings.put("mail", MailMessage.class);
        converter.setTypeIdMappings(typeIdMappings);

        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");

        return converter;
    }

    @Bean
    public ConnectionFactory createConnectionFactory(){
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);

        return connectionFactory;
    }

    @Bean(name = "myJmsFactory")
    public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrency("10-50");
        factory.setMessageConverter(jacksonJmsMessageConverter());
        return factory;
    }

    @Bean
    public Destination jmsDestination() {
        RMQDestination jmsDestination = new RMQDestination();
        jmsDestination.setDestinationName("myQueue");
        jmsDestination.setAmqp(false);
        jmsDestination.setAmqpQueueName("mails");
        return jmsDestination;
    }

    @Bean
    public JmsTemplate myJmsTemplate(ConnectionFactory connectionFactory) {
        final JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
        jmsTemplate.setMessageConverter(jacksonJmsMessageConverter());
        return jmsTemplate;
    }

}

com.test.jms.sender 包中的 JmsMailSenderImpl 类

@Component
public class JmsMailSenderImpl implements MailSender {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private JmsTemplate jmsTemplate;

    @Override
    public boolean sendMail(MailMessage message) {
        logger.info("Sending message!");
        jmsTemplate.convertAndSend("mailbox", message);

        return false;
    }

}

com.test.perf.jms.receiver 包中的 JmsMailReceiverPerfImpl 类

@Component
public class JmsMailReceiverPerfImpl implements ReceivedDatesKeeper {

    private Logger logger = LoggerFactory.getLogger(getClass());

    private Map<String,Date> datesReceived = new HashMap<String, Date>();

    @JmsListener(destination = "mailbox", containerFactory = "myJmsFactory", concurrency = "10")
    public void receiveMail(MailMessage message) {
        datesReceived.put(message.getSubject(), new Date());
        logger.info("Received <" + message.getSubject() + ">");
    }

    public Map<String, Date> getDatesReceived() {
        return datesReceived;
    }

}

我通过启动 10 个线程并让各个 MailSender 分别发送 1000 封邮件来测试上述配置。

对于带有 RabbitTemplate 的配置,我得到: * 所有消息的总吞吐时间:3687ms * 处理一条消息的时间:817ms

对于带有 JmsTemplate 的配置,我得到: * 所有消息的总吞吐时间:41653ms * 处理一条消息的时间:67ms

这似乎表明带有 JmsTemplate 的版本没有并行工作,或者至少没有最佳地使用资源。

有人知道是什么原因造成的吗?我尝试了不同的事务和并发参数,但无济于事。

我们想要使用 JmsTemplate 获得与使用 RabbitTemplate 相同的吞吐量时间,因此我们可以将 JMS 用作抽象层。

【问题讨论】:

  • 我不是在争论这个问题,但你的数学有问题 - 总时间超过 10 倍,但每条消息的时间少于 1/10。
  • 数字是正确的。使用 jms 侦听器处理消息的时间确实比使用 rabbit 侦听器要短很多。这似乎违反直觉,但也许它只是意味着在第二种情况下实际上没有以某种方式并发执行?
  • 有趣;那么“处理一条消息的时间”是什么意思?我假设它是was total time / # messages,在这种情况下,您将只使用 Rabbit 处理 4 条消息,使用 JMS 处理 621 条消息。你什么时候开始/停止每条消息的时钟? Spring 框架基础结构(在 JMS/RabbitMQ 客户端之上)对于 RabbitListener 和 JmsListener 几乎相同,所以我希望得到类似的结果。

标签: java rabbitmq spring-jms spring-rabbit


【解决方案1】:

我明白为什么消费者端速度较慢 - Consumer.receive() 对每条消息使用同步 basicGet(),而 @RabbitListener 容器使用预取计数为 250 的 basicConsume

在JMS发送端,你需要使用CachingConnectionFactory,否则每次发送都会创建一个新的会话/生产者/通道。

尽管如此,它仍然会慢一些;我建议你在 RabbitMQ 工程师常去的 rabbitmq-users 谷歌群组中询问。他们维护 JMS 客户端。

【讨论】:

  • 根据您的建议,我们能够获得一些重大的性能改进。不过,使用 JMS 侦听器确实比使用 RabbitListener 慢 2-3。我会在你提到的小组中询问更多信息。
猜你喜欢
  • 1970-01-01
  • 2023-02-15
  • 2011-06-01
  • 1970-01-01
  • 1970-01-01
  • 2011-09-16
  • 2018-12-10
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多