【问题标题】:What's the earliest point of entry to read a rabbit message in spring-amqp?在 spring-amqp 中阅读兔子消息的最早入口点是什么?
【发布时间】:2018-06-27 08:24:10
【问题描述】:

我将线程本地的兔子消息数据存储在 MDC 中。我想为传入的兔子消息清除旧的并添加新的上下文数据,例如从标头中读取某些值或将兔子消息有效负载读取为byte[]。不幸的是,我经常看到在消息到达我的 @RabbitHandler 注释方法之前发生异常。是否有一个更早的入口点可以用来建立这个上下文?我不知道在反序列化发生之前会发生什么,但理想情况下我想在尝试反序列化之前访问该消息。也许某处有一个onMessageReceived(byte[] message, Map headers) 方法挂钩。调用堆栈越早越好。

【问题讨论】:

    标签: java spring rabbitmq spring-amqp spring-rabbit


    【解决方案1】:

    @RabbitHandlerAbstractRabbitListenerContainerFactory 填充,可以通过自定义 MessageConverter:https://docs.spring.io/spring-amqp/docs/2.0.1.RELEASE/reference/html/_reference.html#message-converters 提供。它的fromMessage() 是从MessagingMessageListenerAdapter.toMessagingMessage() 调用的。这是在MessagingMessageListenerAdapter.onMessage() 中完成的。那确实是您可以钩住的很早的地方。而且您确实仍然有一个原始的 org.springframework.amqp.core.Message 对象,没有任何转换并且具有所有可用的标头和属性。

    嗯,你也可以注入:

    /**
     * @param afterReceivePostProcessors the post processors.
     * @see AbstractMessageListenerContainer#setAfterReceivePostProcessors(MessagePostProcessor...)
     */
    public void setAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors) {
    

    出于类似的原因,您提出请求。

    【讨论】:

    • 看来我必须在转换器中完成它。谢谢,我试试这个。知道运行MessagingMessageListenerAdapter.onMessage() 的线程是否与执行@RabbitHandler 调用的线程相同?
    • 确实是一样的。只需按照我提到的调用堆栈即可。
    • 我建议setAfterReceivePostProcessors() 是最简单的路线。
    【解决方案2】:

    一种解决方案是使用SimpleMessageListenerContainer 而不是@RabbitHandler 注释,并使用自定义消息侦听器适配器。

    例子:

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                             MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(listenerAdapter); // custom listener
        container.setMessageConverter(null); // disable default conversion
        return container;
    }
    
    @Bean
    MessageListenerAdapter listenerAdapter() {
        RawMessageDelegate delegate = new RawMessageDelegate();
        return new MessageListenerAdapter(delegate);
    }
    
    public class RawMessageDelegate {
    
        void handleMessage(Message message) {
            byte[] body = message.getBody();
            MessageProperties properties = message.getMessageProperties();
            Map<String, Object> headers = properties.getHeaders();
            // handle raw data
        }
    
    }
    

    【讨论】:

      猜你喜欢
      • 2021-07-22
      • 2013-02-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多