【问题标题】:Rabbit MQ deisgn using spring abstraction使用 Spring 抽象的 Rabbitmq 设计
【发布时间】:2017-02-25 17:28:22
【问题描述】:

我需要从名为 Metadata 的队列中侦听消息 - 然后基于该消息,我将不得不读取一些队列,我们​​将其称为 dataQ(该队列的名称将在元数据消息中)。要读取元数据,我可以使用兔子侦听器,但之后我必须从 dataQ 读取其他消息,因此我可以通过一种方式进行手动拉取 - 但我希望拥有更清洁的东西,比如兔子侦听器,这样我就不必管理频道,确认等等。但是由于直到我们从元数据队列中读取消息后才知道队列名称,因此尝试探索其他解决方案。这个 dataQ 可以是 1000 个不同的队列名称,所以我们必须动态监听这个 dataQ。

ack 也应该像这样工作 - 从元数据队列中读取消息,处理给定的 dataQ - 为 dataQ 中的消息发送 ack(dataQ 可能有超过 1 条消息)并为元数据队列发送 ack。

(如果这对单个消费者有效,那么我可以添加容器模型并处理来自元数据队列的多个消息,这意味着我将能够同时处理多个数据队列。)


按照建议进行更新,对如何在主侦听器中获取事件感到困惑,以及该标志如何与并发一起工作(抱歉,到目前为止尚未广泛使用应用程序事件)

package com.example;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;

@Configuration
public class MyListener {

    @Autowired
    ConnectionFactory connectionFactory;

    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;

    @RabbitListener(queues = "Metadata")
    public void messageProcessing(String c) {
        System.out.println(c);

        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(c);
        container.setMessageListener(new MessageListenerAdapter(new DataHandler()));
        container.setApplicationEventPublisher(applicationEventPublisher);
        container.setIdleEventInterval(5000);
        container.start();

        // how to get container idle event here

        // so we can call container.stop();

    }

    public class DataHandler {
        public void handleMessage(byte[] text) {
            System.out.println("Data Received: " + text);
        }
    }

    @EventListener
    public void onApplicationEvent(ApplicationEvent event) {
        //I am getting idle event here
        System.out.println(event.getSource());
    }

}

【问题讨论】:

    标签: rabbitmq spring-amqp


    【解决方案1】:

    在元数据侦听器中启动一个新的SimpleMessageListenerContainer 来处理数据非常容易;但您无法确认来自其他侦听器的原始消息。

    您必须保持元数据线程,直到辅助侦听器完成,然后释放元数据线程,以便它确认原始消息。 您可以使用容器空闲事件来检测工作是否完成(除非您有其他机制可以知道一切都已完成)。

    在元数据侦听器容器上设置并发,以确定您希望以这种方式同时处理的数量。

    @RabbitListener(queues = "meta")
    public void handle(SomeObject message) {
    
        // extract dataQ
    
        // create a new SimpleMessageListenerContainer
        // Inject an ApplicationEventPublisher instance
        // start the container
    
        // block here, waiting for a container idle event
        // stop the container
    
        return;
    }
    

    但请记住,如果服务器崩溃,元数据消息将被重新传递(默认情况下),并且您可能已经处理了一些数据消息。

    编辑

    关于您下面的评论,我的意思是使用您自己的发布者,因此您不必弄清楚事件来自哪个容器...

    @SpringBootApplication
    public class So42459257Application {
    
        public static void main(String[] args) throws Exception {
            ConfigurableApplicationContext context = SpringApplication.run(So42459257Application.class, args);
            RabbitTemplate template = context.getBean(RabbitTemplate.class);
            template.convertAndSend("meta", "foo");
            template.convertAndSend("foo", "baz");
            template.convertAndSend("foo", "baz");
            template.convertAndSend("meta", "bar");
            template.convertAndSend("bar", "qux");
            template.convertAndSend("bar", "qux");
            context.getBean(So42459257Application.class).testCompleteLatch.await(10, TimeUnit.SECONDS);
            context.close();
        }
    
        private final CountDownLatch testCompleteLatch = new CountDownLatch(2);
    
        @Autowired
        private ConnectionFactory connectionFactory;
    
        @RabbitListener(queues = "meta")
        public void handleMeta(final String queue) throws Exception {
            System.out.println("Started processing " + queue);
            final CountDownLatch startedLatch = new CountDownLatch(1);
            final CountDownLatch finishedLatch = new CountDownLatch(1);
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(this.connectionFactory);
            container.setQueueNames(queue);
            container.setMessageListener(new MessageListenerAdapter(new Object() {
    
                @SuppressWarnings("unused")
                public void handleMessage(String in) {
                    startedLatch.countDown();
                    System.out.println("Received " + in + " from " + queue);
                }
    
            }));
            container.setIdleEventInterval(5000);
            container.setApplicationEventPublisher(new ApplicationEventPublisher() {
    
                    @Override
                    public void publishEvent(Object event) {
                    }
    
                    @Override
                    public void publishEvent(ApplicationEvent event) {
                        if (event instanceof ListenerContainerIdleEvent) {
                            finishedLatch.countDown();
                    }
    
                };
            });
            container.afterPropertiesSet();
            container.start();
            if (startedLatch.await(60, TimeUnit.SECONDS)) {
                // handle container didn't receive any messages
            }
            if (finishedLatch.await(60, TimeUnit.SECONDS)) {
                // handle container didn't go idle
            }
            System.out.println("Finished processing " + queue);
            container.stop();
            this.testCompleteLatch.countDown();
        }
    
        @Bean
        public Queue meta() {
            return new Queue("meta", false, false, true);
        }
    
        @Bean
        public Queue foo() {
            return new Queue("foo", false, false, true);
        }
    
        @Bean
        public Queue bar() {
            return new Queue("bar", false, false, true);
        }
    
    }
    

    将侦听器容器并发设置为2(使用spring boot只需添加

    spring.rabbitmq.listener.concurrency=2
    

    到应用程序属性);如果您不使用引导,请自行配置工厂。

    结果:

    Started processing bar
    Started processing foo
    Received baz from foo
    Received qux from bar
    Received baz from foo
    Received qux from bar
    Finished processing bar
    Finished processing foo
    

    【讨论】:

    • 谢谢加里,我按照你的建议做了,它按照你说的工作。我用后续问题更新了问题。
    • 我的意思是使用您自己的发布者,这样您就不必弄清楚事件来自哪个容器。请参阅我的答案的编辑。
    • 这太棒了,现在我也可以用来处理来自数据 q 的 100 条消息并将消息放回元数据 q 中,这样我就不会继续处理一个数据 q。我会尽快尝试这个版本。这是完美的。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-03-07
    • 2015-12-14
    • 2011-03-17
    相关资源
    最近更新 更多