【问题标题】:Problem with RabbitMQ Direct reply-to with Spring使用 Spring 直接回复 RabbitMQ 的问题
【发布时间】:2020-01-15 23:03:49
【问题描述】:

我正在开发一个将消息发送到服务器的应用程序,然后修改给定的消息并使用 Direct Reply-to 将其发送回amq.rabbitmq.reply-to 队列。我已经按照教程https://www.rabbitmq.com/direct-reply-to.html 进行了操作,但是我在实现它时遇到了一些问题。在我的情况下,据我了解,我需要在无确认模式下使用来自伪队列amq.rabbitmq.reply-to 的消息,在我的情况下是MessageListenerContainer。这是我的配置:

@Bean
    public Jackson2JsonMessageConverter messageConverter() {    
        ObjectMapper mapper = new ObjectMapper();
        return new Jackson2JsonMessageConverter(mapper);
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());
        rabbitTemplate.setReplyAddress("amq.rabbitmq.reply-to");
        return rabbitTemplate;
    }

    @Bean
    MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory ) {       
        DirectMessageListenerContainer directMessageListenerContainer = new DirectMessageListenerContainer();
        directMessageListenerContainer.setConnectionFactory(connectionFactory);
        directMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.NONE);
        directMessageListenerContainer.setQueueNames("amq.rabbitmq.reply-to");
        directMessageListenerContainer.setMessageListener(new PracticalMessageListener());
        return directMessageListenerContainer;

    }

消息通过 STOM 协议上的 SEND 帧以 JSON 格式发送并转换。然后是一个新队列
动态创建并添加到 MessageListenerContainer。因此,当消息到达代理时,我想在服务器端对其进行修改并发送回amq.rabbitmq.reply-to,并将原始消息发送到路由键messageTemp.getTo(),该路由键在 STOMP 的 SUBSCRIBE 帧上订阅。

  @MessageMapping("/private")
  public void send2(MessageTemplate messageTemp) throws Exception {
      MessageTemplate privateMessage = new MessageTemplate(messageTemp.getPerson(),
              messageTemp.getMessage(), 
              messageTemp.getTo());

     AbstractMessageListenerContainer abstractMessageListenerContainer =
              (AbstractMessageListenerContainer) mlc;

       // here's the queue added to listener container   
      abstractMessageListenerContainer.addQueueNames(messageTemp.getTo());

      MessageProperties mp = new MessageProperties();
      mp.setReplyTo("amq.rabbitmq.reply-to");
      mp.setCorrelationId("someId");

      Jackson2JsonMessageConverter smc = new Jackson2JsonMessageConverter();
      Message message = smc.toMessage(messageTemp, mp);


      rabbitTemplate.sendAndReceive( 
              messageTemp.getTo() , message);
  }

消息发送到messageTemp.getTo()路由键时修改onMessage方法

@Component
public class PracticalMessageListener implements MessageListener {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Override
    public void onMessage(Message message) {
        System.out.println(("message listener.."));
        String body = "{ \"processing\": \"123456789\"}";
       MessageProperties properties = new MessageProperties();

       // some business logic on the message body

        properties.setCorrelationId(message.getMessageProperties().getCorrelationId());
        Message responseMessage = new Message(body.getBytes(), properties);

        rabbitTemplate.convertAndSend("", 
                message.getMessageProperties().getReplyTo(), responseMessage);
    }

我可能误解了直接回复的概念和说明:

在 no-ack 模式下从伪队列 amq.rabbitmq.reply-to 消费。无需先声明此“队列”,尽管客户端可以根据需要声明。

问题是我需要从那个队列中消费什么?如果出现错误,如何访问修改后的消息:

2020-01-15 22:17:09.688  WARN 96222 --- [pool-1-thread-5] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
Caused by: java.lang.NullPointerException: null
    at com.patrykmaryn.spring.second.PracticalMessageListener.onMessage(PracticalMessageListener.java:50) ~[classes/:na]

当我在PracticalMessageListener 中调用rabbitTemplate.convertAndSend 时,这来自该地方

编辑

我摆脱了在DirectMessageListenerContainer 中设置amq.rabbitmq.reply-to 并实现了DirectReplyToMessageListenerContainer

@Bean
    DirectReplyToMessageListenerContainer drtmlc (ConnectionFactory connectionFactory) {
        DirectReplyToMessageListenerContainer drtmlc =
                new DirectReplyToMessageListenerContainer(connectionFactory);
        drtmlc.setConnectionFactory(connectionFactory);
        drtmlc.setAcknowledgeMode(AcknowledgeMode.NONE);
        drtmlc.setMessageListener(new DirectMessageListener());
        return drtmlc;
    }

问题一定出在onMessage 方法中,该方法不允许在rabbitTemplate 上调用任何发送方法,我尝试过使用不同的现有路由键和交换。监听来自使用路由键messageTemp.getTo() 定义的队列。

@Override
    public void onMessage(Message message) {
        System.out.println(("message listener.."));

        String receivedRoutingKey = message.getMessageProperties()
           .getReceivedRoutingKey();
        System.out.println(" This is received routingkey: " + 
            receivedRoutingKey);

           /// ..... rest of code goes here

        rabbitTemplate.convertAndSend("", 
                message.getMessageProperties().getReplyTo(), 
                responseMessage);

messageTemp.getTo() 是在运行时定义的路由键,通过选择接收器,例如,如果我选择“user1”,它将打印出“user1”。

这是第一次尝试发送消息:

2020-01-16 02:22:20.213 DEBUG 28490 --- [nboundChannel-6] .WebSocketAnnotationMethodMessageHandler : Searching methods to handle SEND /app/private session=45yca5sy, lookupDestination='/private'
2020-01-16 02:22:20.214 DEBUG 28490 --- [nboundChannel-6] .WebSocketAnnotationMethodMessageHandler : Invoking PracticalTipSender#send2[1 args]
2020-01-16 02:22:20.239  INFO 28490 --- [nboundChannel-6] o.s.a.r.l.DirectMessageListenerContainer : SimpleConsumer [queue=user1, consumerTag=amq.ctag-Evyiweew4C-K1mXmy2XqUQ identity=57b19488] started
2020-01-16 02:22:20.268  INFO 28490 --- [nboundChannel-6] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService
2020-01-16 02:22:20.269  INFO 28490 --- [nboundChannel-6] .l.DirectReplyToMessageListenerContainer : Container initialized for queues: [amq.rabbitmq.reply-to]
2020-01-16 02:22:20.286  INFO 28490 --- [nboundChannel-6] .l.DirectReplyToMessageListenerContainer : SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-IXWf-zEyI34xzQSSfbijzg identity=4bedbba5] started

第二个失败:

2020-01-16 02:23:20.247 DEBUG 28490 --- [nboundChannel-3] .WebSocketAnnotationMethodMessageHandler : Searching methods to handle SEND /app/private session=45yca5sy, lookupDestination='/private'
2020-01-16 02:23:20.248 DEBUG 28490 --- [nboundChannel-3] .WebSocketAnnotationMethodMessageHandler : Invoking PracticalTipSender#send2[1 args]
2020-01-16 02:23:20.248  WARN 28490 --- [nboundChannel-3] o.s.a.r.l.DirectMessageListenerContainer : Queue user1 is already configured for this container: org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer@3b152928, ignoring add
2020-01-16 02:23:20.250  WARN 28490 --- [nboundChannel-3] o.s.a.r.l.DirectMessageListenerContainer : Queue user1 is already configured for this container: org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer@3b152928, ignoring add
message listener..
 This is received routingkey: user1
2020-01-16 02:23:20.271  WARN 28490 --- [pool-1-thread-5] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception

编辑

DirectReplyToMessageListenerContainer 放在一个单独的类中并将其MessageListener 设置为@Bean 以及 directMessageListenerContainer.setMessageListener(practicalMessageListener());@Bean 似乎摆脱了 NPE。但即使是回复到amq.rabbitmq.reply-to.g2dkABVyYWJ.....,它似乎也没有在DirectReplyToMessageListenerContainer drtmlc 中被收听。

@Component
class DirectMessageListener implements MessageListener {
    // This doesn't get invoked...
    @Override
    public void onMessage(Message message) {
        System.out.println("direct reply message sent..");

    }
}

@Component
class ReplyListener {

    @Bean
    public DirectMessageListener directMessageListener() {
        return new DirectMessageListener(); 
    }

    @Bean
    DirectReplyToMessageListenerContainer drtmlc (ConnectionFactory connectionFactory) {
        DirectReplyToMessageListenerContainer drtmlc =
                new DirectReplyToMessageListenerContainer(connectionFactory);
        drtmlc.setConnectionFactory(connectionFactory);
        drtmlc.setAcknowledgeMode(AcknowledgeMode.NONE);
        drtmlc.setMessageListener(directMessageListener());
        return drtmlc;
    }
}

【问题讨论】:

    标签: java spring rabbitmq spring-rabbit


    【解决方案1】:

    是的,您误解了该功能。

    每个通道都有自己的伪队列;您只能从同一个频道接收,因此一般的消息侦听器容器不会破解它。

    directMessageListenerContainer.setQueueNames("amq.rabbitmq.reply-to");

    你根本做不到。

    框架已经支持直接回复,在RabbitTemplate 内部。 RabbitTemplate 有自己的DirectReplyToMessageListenerContainer,它维护着一个频道池。

    每个请求检出一个通道并在那里返回回复,然后将通道返回到池中以供另一个请求重用。

    使用RabbitTemplate.convertSendAndReceive();默认行为(在最近的版本中)将自动使用直接回复。

    编辑

    为什么不让框架完成所有繁重的工作,而您只需专注于业务逻辑:

    @SpringBootApplication
    public class So59760805Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So59760805Application.class, args);
        }
    
        @Bean
        public SimpleMessageListenerContainer container(ConnectionFactory cf) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
            container.setQueueNames("foo");
            container.setMessageListener(new MessageListenerAdapter(new MyListener()));
            return container;
        }
    
        @Bean
        public MyExtendedTemplate template(ConnectionFactory cf) {
            return new MyExtendedTemplate(cf);
        }
    
        @Bean
        public ApplicationRunner runner(RabbitTemplate template) {
            return args -> System.out.println(template.convertSendAndReceive("", "foo", "test"));
        }
    
    }
    
    class MyListener {
    
        public String handleMessage(String in) {
            return in.toUpperCase();
        }
    
    }
    
    class MyExtendedTemplate extends RabbitTemplate {
    
        MyExtendedTemplate(ConnectionFactory cf) {
            super(cf);
        }
    
        @Override
        public void onMessage(Message message) {
            System.out.println("Response received (before conversion): " + message);
            super.onMessage(message);
        }
    
    }
    

    rabbit 模板默认使用直接回复(内部)。

    Response received (before conversion): (Body:'TEST' MessageProperties [headers={}, correlationId=1, ...receivedRoutingKey=amq.rabbitmq.reply-to.g2dkAA5yYWJiaXRAZ29sbHVtMgAAeE0AAADmAw==.RQ/uxjR79PX/hZF+7iAdWw==, ...
    TEST
    

    【讨论】:

    • 是否意味着我需要实现DirectReplyToMessageListenerContainer.setQueueNames("amq.rabbitmq.reply-to")。?或者如果它支持直接回复设置队列amq.rabbitmq.reply-to不需要?问题是当消息到达具有路由键getReplyTo() 的队列时调用发送到队列amq.rabbitmq.reply-to 的bc,由directMessageListenerContainer 监听。即使在使用RabbitTemplate.convertSendAndReceive() 之后,我仍然得到相同的 NPE。
    • 否; rabbitTemplate.sendAndReceive(..)(或convertSendAndReceive())负责一切,包括设置replyTo 和相关ID。看来您的PracticalMessageListener 正试图从"amq.rabbitmq.reply-to" 消费,这是错误的;它应该从 messageTemp.getTo() 设置的任何内容中使用并回复 replyTo() - 如果您在调试器中检查它,您会看到它是一个唯一的伪队列名称。
    • 我不知道为什么在onMessage method 中调用rabbitTemplatesend 方法会导致NPE。 PracticalMessageListener 消耗messageTemp.getTo() ,我检查了打印message.getMessageProperties().getReceivedRoutingKey()
    • drtmlc.setMessageListener(new DirectMessageListener()); - 这意味着 Spring 不管理该 bean,因此如果您自动连接模板,它将无法工作 - 它需要是 @Bean 或者您需要传入自己做模板。但是,再一次,您不应该在那里使用DirectReplyToMessageListenerContainer - 客户端(发送/接收)的模板使用它来接收回复。服务器端只需要一个常规的侦听器容器(简单或直接)。
    • 我注册了drtmlc.setMessageListener(directMessageListener()),其中directMessageListener() 是一个单独的类中的@Bean。但仍然不收听在rabbitTemplate.convertAndSend 方法中message.getMessageProperties().getReplyTo() 中发送的消息onMessage
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多