【问题标题】:How to make several threads do takes from RabbitMQ queue using Spring Boot?如何使用 Spring Boot 从 RabbitMQ 队列中获取多个线程?
【发布时间】:2019-02-21 09:03:16
【问题描述】:

我们的应用程序使用 RabbitMQ 提供的多个队列中的数据。为了增加吞吐量,我们为每个队列启动多个线程,这些线程从这些队列中进行阻塞获取。

对于一个新服务,我们希望使用 Spring Boot,并且每个队列有多个线程从这些队列中获取数据。这是用于处理从某个队列到达的数据的规范 Spring Boot 代码:

@StreamListener(target = Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Message<SomeData> process(Message<SomeData> message) {
    SomeData result = service.process(message.getPayload());
    return MessageBuilder
            .withPayload(result)
            .copyHeaders(message.getHeaders())
            .build();
}

现在的问题是如何让 Spring Boot 产生多个线程来服务一个队列而不是单个线程。吞吐量对我们的应用程序非常关键,因此需要这样做。

【问题讨论】:

    标签: java spring-boot rabbitmq


    【解决方案1】:

    查看available properties,搜索rabbitmq。

    spring.rabbitmq.listener.simple.concurrency= # 最小监听调用线程数

    看起来很有希望

    【讨论】:

    • 谢谢,托马斯。我想知道是否有办法为单个队列设置并发,例如类似 spring.rabbitmq.listener.simple.concurrency.myqueue
    • 我不这么认为,至少不是开箱即用。我相信实际上听众不会绑定到队列,而是绑定到整个经纪人,然后说出他们对哪些队列感兴趣(但这只是我的猜测)
    • 此配置运行良好,即使第一个消息仍在处理中,它也可以使用多条消息
    【解决方案2】:

    您可以在配置队列时为队列设置并发消费者。

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory
            (MessageConverter contentTypeConverter,
             SimpleRabbitListenerContainerFactoryConfigurer configurer) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    
        // the number of consumers is set as 5
        factory.setConcurrentConsumers(5);
    
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(contentTypeConverter);
        return factory;
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2011-10-05
      • 1970-01-01
      • 1970-01-01
      • 2021-04-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多