【问题标题】:Is it possible to set prefetch count on @RabbitListener是否可以在@RabbitListener 上设置预取计数
【发布时间】:2016-09-30 04:04:40
【问题描述】:

我知道可以在此处制作SimpleMessageListenerContainer bean 并设置预取计数和消息侦听器,如下所示:

@Bean
public SimpleMessageListenerContainer messageListenerContainer(
        ConnectionFactory rabbitConnectionFactory,
        Receiver receiver) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(rabbitConnectionFactory);
    container.setQueueNames("hello");
    container.setMessageListener(new MessageListenerAdapter(receiver, "receive"));
    container.setPrefetchCount(1000);
    return container;
}

但是,如果我想使用 @RabbitListener 的声明式方法,如何设置通道的预取计数?

@Component
public class Receiver {

    private static final Logger log = LoggerFactory.getLogger(Receiver.class);

    @RabbitListener(queues = "hello") // how to set prefetch count here?
    public void receive(String message) {
        log.info(" [x] Received '{}'.", message);
    }

}

不可能吗?

【问题讨论】:

    标签: java spring spring-amqp


    【解决方案1】:

    如果您想为每个队列使用不同的预取,您可以创建具有默认弹簧设置的侦听器工厂并仅覆盖预取。

    @Bean
    public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setPrefetchCount(prefetch);
        return factory;
    }
    

    【讨论】:

      【解决方案2】:

      我使用的是 Spring boot 2.3.3,我在 application.properties 中更改了以下内容,它工作正常。

      spring.rabbitmq.listener.direct.prefetch=1000
      spring.rabbitmq.listener.simple.prefetch=1000
      

      我不知道直接和简单之间的区别,所以我都设置了。

      【解决方案3】:
         @Bean
          public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
              SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
              factory.setConnectionFactory(connectionFactory);
              int concurrentConsumers = Integer.parseInt(PropertiesLoader.getProperty("queue.concurrent-consumers"));
              int maxConcurrentConsumers = Integer.parseInt(PropertiesLoader.getProperty("queue.max-concurrent-consumers"));
              int consecutiveActiveTrigger = Integer.parseInt(PropertiesLoader.getProperty("queue.consecutive-active-trigger"));
              int prefectCount = Integer.parseInt(PropertiesLoader.getProperty("queue.prefetch_count"));
      
              factory.setPrefetchCount(prefectCount);
              factory.setConcurrentConsumers(concurrentConsumers);
              factory.setMaxConcurrentConsumers(maxConcurrentConsumers);
              factory.setConsecutiveActiveTrigger(consecutiveActiveTrigger);
              return factory;
          }
      

      【讨论】:

        【解决方案4】:

        根据@artem-bilan 回答的解决方案:

        在某些@Configuration 类中声明RabbitListenerContainerFactory bean,预取计数为10:

        @Bean
        public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchTenRabbitListenerContainerFactory(ConnectionFactory rabbitConnectionFactory) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(rabbitConnectionFactory);
            factory.setPrefetchCount(10);
            return factory;
        }
        

        Receiver bean 使用这个工厂 bean:

        @Component
        public class Receiver {
        
            private static final Logger log = LoggerFactory.getLogger(Receiver.class);
        
            @RabbitListener(queues = "hello", containerFactory = "prefetchTenRabbitListenerContainerFactory")
            public void receive(String message) {
                log.info(" [x] Received '{}'.", message);
            }
        
            @RabbitListener(queues = "hello")
            public void receiveWithoutPrefetch(String message) {
                log.info(" [x] Received without prefetch '{}'.", message);
            }
        
        }
        

        这里的两个监听器仅用于演示目的。
        使用此配置,Spring 创建了两个 AMQP 通道。每个@RabbitListener 一个。首先使用我们的新 prefetchTenRabbitListenerContainerFactory bean 预取计数 10,然后使用默认 rabbitListenerContainerFactory bean 预取计数 1。

        【讨论】:

        • 如果你使用的是 Spring Boot 创建的默认容器工厂,你可以在 application.yml 或 application.properties 中设置属性——见boot properties appendix中的spring.rabbitmq.listener.prefetch
        【解决方案5】:

        @RabbitListenercontainerFactory 选项:

        /**
         * The bean name of the {@link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory}
         * to use to create the message listener container responsible to serve this endpoint.
         * <p>If not specified, the default container factory is used, if any.
         * @return the {@link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory}
         * bean name.
         */
        String containerFactory() default "";
        

        您可以在其中使用所需的prefetchCount 配置SimpleRabbitListenerContainerFactory,并且该注释的目标SimpleMessageListenerContainer 将为您提供该选项。

        【讨论】:

        • 如果你使用的是 Spring Boot 创建的默认容器工厂,你可以在 application.yml 或 application.properties 中设置属性——见boot properties appendix中的spring.rabbitmq.listener.prefetch
        • @GaryRussell,文档没有提到任何默认值。如果没有设置,你知道预取大小吗?
        • 最近版本中为 250 - 请参阅 Spring AMQP reference
        猜你喜欢
        • 1970-01-01
        • 2010-12-16
        • 2019-07-20
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2011-07-05
        • 2020-10-04
        • 1970-01-01
        相关资源
        最近更新 更多