【问题标题】:Is there way to build no of listener for a queue by using configuration file in AMQP有没有办法通过在 AMQP 中使用配置文件来为队列构建监听器
【发布时间】:2018-10-18 21:05:51
【问题描述】:

我已将 50K 对象发布到特定队列。我有一个监听器,它选择每个对象并处理它。但显然处理所有 50k 个对象需要更多时间。所以我想再放置 3 个可以并行处理这些对象的侦听器。为此,我是否需要再编写两个侦听器类?使用相同的代码?这将是代码的重复。有什么方法可以配置我们想要的侦听器数量,以便在内部为同一个侦听器创建实例来处理负载?任何人都可以帮助我更好的方法让更多的侦听器处理负载以增加处理。

====Rabbit mq配置文件一段代码=============

@Bean
    public SubscriberGeneralQueue1 SubscriberGeneralQueue1(){
        return new SubscriberGeneralQueue1();
    }

@Bean
        public SimpleMessageListenerContainer rpcGeneralReplyMessageListenerContainer(ConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter1 ) {
            SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
            simpleMessageListenerContainer.setQueues(replyQueueRPC());
            simpleMessageListenerContainer.setTaskExecutor(taskExecutor());
            simpleMessageListenerContainer.setMessageListener(listenerAdapter1);
            simpleMessageListenerContainer.setMaxConcurrentConsumers(60);
            return simpleMessageListenerContainer;
        }
       @Bean
        @Qualifier("listenerAdapter1")
        MessageListenerAdapter listenerAdapter1(SubscriberGeneralQueue1 generalReceiver) {
            return new MessageListenerAdapter(generalReceiver, "receivegeneralQueueMessage");
        }

===监听代码=================

@EnableRabbit
public class SubscriberGeneralQueue1 {

     /*@Autowired
        @Qualifier("asyncGeneralRabbitTemplate")
    private AsyncRabbitTemplate asyncGeneralRabbitTemplate;*/

    @Autowired
    private ExecutorService executorService;
    @Autowired
    private GeneralProcess generalProcess;

    List <RequestPojo> requestPojoGeneral = new ArrayList<RequestPojo>();

    @RabbitHandler
    @RabbitListener(containerFactory = "simpleMessageListenerContainerFactory", queues ="BulkSolve_GeneralrequestQueue")
    public void subscribeToRequestQueue(@Payload RequestPojo sampleRequestMessage, Message message) throws InterruptedException {

        long startTime=System.currentTimeMillis();

        //requestPojoGeneral.add(sampleRequestMessage);
        //System.out.println("List size issssss:" +requestPojoGeneral.size() );
        //generalProcess.processRequestObjectslist(requestPojoGeneral);
        generalProcess.processRequestObjects(sampleRequestMessage);

        System.out.println("message in general listener is:" + sampleRequestMessage.getDistance());
        System.out.println("Message payload is:" + sampleRequestMessage);
        System.out.println("Message payload1111 is:" + message );

        //return requestPojoGeneral;

    }

}

===simplemessagelistenercontainerFactory 配置===========

 @Bean
        public SimpleRabbitListenerContainerFactory simpleMessageListenerContainerFactory(ConnectionFactory connectionFactory,
                                                                                          SimpleRabbitListenerContainerFactoryConfigurer configurer) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setTaskExecutor(taskExecutor());
            factory.setMaxConcurrentConsumers(60);
            configurer.configure(factory, connectionFactory);
            return factory;
        }

====建议更改=====

@RabbitHandler
    @Async
    @RabbitListener(containerFactory = "simpleMessageListenerContainerFactory", queues ="BulkSolve_GeneralrequestQueue")
    public void subscribeToRequestQueue(@Payload RequestPojo sampleRequestMessage, Message message) throws InterruptedException {

        long startTime=System.currentTimeMillis();

        //requestPojoGeneral.add(sampleRequestMessage);
        //System.out.println("List size issssss:" +requestPojoGeneral.size() );
        //generalProcess.processRequestObjectslist(requestPojoGeneral);
        generalProcess.processRequestObjects(sampleRequestMessage);

        System.out.println("message in general listener is:" + sampleRequestMessage.getDistance());
        System.out.println("Message payload is:" + sampleRequestMessage);
        System.out.println("Message payload1111 is:" + message );

        //return requestPojoGeneral;

    }


}

配置:

@Bean
        public SimpleRabbitListenerContainerFactory simpleMessageListenerContainerFactory(ConnectionFactory connectionFactory,
                                                                                          SimpleRabbitListenerContainerFactoryConfigurer configurer) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setTaskExecutor(taskExecutor());
            factory.setMaxConcurrentConsumers(60);
            factory.setConsecutiveActiveTrigger(1);
            configurer.configure(factory, connectionFactory);
            return factory;
        }

  @Bean
        public SimpleMessageListenerContainer rpcGeneralReplyMessageListenerContainer(ConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter1 ) {
            SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
            simpleMessageListenerContainer.setQueues(replyQueueRPC());
            simpleMessageListenerContainer.setTaskExecutor(taskExecutor());
            simpleMessageListenerContainer.setMessageListener(listenerAdapter1);
            simpleMessageListenerContainer.setMaxConcurrentConsumers(100);
            simpleMessageListenerContainer.setConsecutiveActiveTrigger(1);
            return simpleMessageListenerContainer;
        }

【问题讨论】:

    标签: rabbitmq spring-amqp spring-rabbit


    【解决方案1】:

    这可以通过ListenerContainerconcurrency 选项来完成:

    SimpleMessageListenerContainer 中配置的来自TaskExecutor 的线程用于在 RabbitMQ 客户端传递新消息时调用MessageListener。如果未配置,则使用 SimpleAsyncTaskExecutor。如果使用池执行器,请确保池大小足以处理配置的并发。使用DirectMessageListenerContainerMessageListener 直接在 RabbitMQ 客户端线程上调用。在这种情况下,taskExecutor 用于监控消费者的任务。

    请从这里开始阅读:https://docs.spring.io/spring-amqp/docs/current/reference/html/_reference.html#receiving-messages

    也可以在这里查看:https://docs.spring.io/spring-amqp/docs/current/reference/html/_reference.html#containerAttributes

    concurrentConsumers (concurrency) - 最初为每个侦听器启动的并发消费者数量。

    更新

    好的!我知道发生了什么。

    我们有这样的代码:

     boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
                        if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
                            if (receivedOk) {
                                if (isActive(this.consumer)) {
                                    consecutiveIdles = 0;
                                    if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
                                        considerAddingAConsumer();
                                        consecutiveMessages = 0;
                                    }
                                }
                            }
    

    因此,我们仅在处理第一条消息后检查可能的并行性。因此,在您的情况下,它将在 1 分钟后发生。

    considerAddingAConsumer() 的另一个标志是关于 consecutiveActiveTrigger 选项,默认情况下是这样的:

    private static final int DEFAULT_CONSECUTIVE_ACTIVE_TRIGGER = 10;
    

    因此,在您的情况下,要允许恰好并行下一条消息,您还应该配置一个:

    /**
     * If {@link #maxConcurrentConsumers} is greater then {@link #concurrentConsumers}, and
     * {@link #maxConcurrentConsumers} has not been reached, specifies the number of
     * consecutive cycles when a single consumer was active, in order to consider
     * starting a new consumer. If the consumer goes idle for one cycle, the counter is reset.
     * This is impacted by the {@link #txSize}.
     * Default is 10 consecutive messages.
     * @param consecutiveActiveTrigger The number of consecutive receives to trigger a new consumer.
     * @see #setMaxConcurrentConsumers(int)
     * @see #setStartConsumerMinInterval(long)
     * @see #setTxSize(int)
     */
    public final void setConsecutiveActiveTrigger(int consecutiveActiveTrigger) {
        Assert.isTrue(consecutiveActiveTrigger > 0, "'consecutiveActiveTrigger' must be > 0");
        this.consecutiveActiveTrigger = consecutiveActiveTrigger;
    }
    

    1。因为0 无论如何也行不通。

    为了获得更好的性能,您还可以考虑使用 @Async 使您的 subscribeToRequestQueue() 真正将处理从消费者线程移交给其他线程,以避免等待另一个消费者开始的 1 分钟。

    【讨论】:

    • 谢谢比兰。我尝试通过参考上述文件来实现。
    • 嗨 Bilan 编辑了我上面的线程以添加代码。我添加了属性 simpleMessageListenerContainer.setMaxConcurrentConsumers(60);我的监听器类是 :SubscriberGeneralQueue1 。所以根据负载它会自动创建 SubscriberGeneralQueue1 实例来处理负载?
    • 没有。它将创建要并行执行的任务。您的 SubscriberGeneralQueue1 仍将作为单个实例。就这样。您应该确保您的侦听器是无状态的
    • 好的。您提到要并行执行的任务:这意味着它将执行我打算在我的 SubscriberGeneralQueue1 中执行的任务吗?我是否已经准备好并行收听和处理消息对象了? ?
    • 感谢比兰的帮助。我真的很喜欢 rabbit-mq 开发人员,因为他们支持他们的产品并帮助使用 rabbit-mq 的人。我还有一个关于性能的问题,我将在单独的线程中提出这个问题。
    猜你喜欢
    • 1970-01-01
    • 2019-11-26
    • 2015-03-28
    • 2018-05-16
    • 2020-12-06
    • 2021-02-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多