【问题标题】:Spring AMQP: Adding Message Processors To Auto Configured `RabbitTemplate`Spring AMQP:将消息处理器添加到自动配置的“RabbitTemplate”
【发布时间】:2019-05-15 17:49:07
【问题描述】:

我正在尝试调用 RabbitTemplate#addBeforePublishPostProcessorsRabbitTemplate#addAfterReceivePostProcessors 而不会过多地干扰 Spring 的自动配置。

我正在尝试执行此操作,但我的 MessagePostProcessor 没有触发(我在发布的消息中没有看到“test_header”)。

  @EventListener
  void test(ApplicationPreparedEvent event) {
    ConfigurableApplicationContext applicationContext = event.getApplicationContext();
    RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
    rabbitTemplate.addBeforePublishPostProcessors(new MessagePostProcessor() {
      @Override
      public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties().setHeader("test_header", "test_header_value");
        return message;
      }
    });
  }

什么地方适合做这个?

我也试过收听ApplicationStartedEvent

更新:

根据 Gary 的建议,在我的 @Configuration 课程中添加了这个 bean:

  @Bean
  public static BeanPostProcessor rabbitTemplatePostProcessor() {
    return new BeanPostProcessor() {
      @Override
      public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if(bean instanceof RabbitTemplate) {
          RabbitTemplate rabbitTemplate = (RabbitTemplate) bean;

          rabbitTemplate.addBeforePublishPostProcessors(m -> {
            m.getMessageProperties()
              .setHeader(MESSAGE_PUBLISHED_TIME, currentTimeMillis());
            return m;
          });

          rabbitTemplate.addAfterReceivePostProcessors(m -> {
            m.getMessageProperties().setHeader(MESSAGE_RECEIVED_TIME, currentTimeMillis());
            return m;
          });
        }
        return bean;
      }
    };
  }

如果您正在使用@RabbitListener@SendTo,如果您正在寻找有关如何执行此操作的答案的任何人,请参阅 Gary 对其答案的编辑。

【问题讨论】:

    标签: java spring-boot spring-amqp


    【解决方案1】:

    使用BeanPostProcessor

    @SpringBootApplication
    public class So56155062Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So56155062Application.class, args);
        }
    
        @Bean
        public static BeanPostProcessor bpp() {
            return new BeanPostProcessor() {
    
                @Override
                public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
                    if (bean instanceof RabbitTemplate) {
                        ((RabbitTemplate) bean).setBeforePublishPostProcessors(m -> {
                            m.getMessageProperties().setHeader("foo", "baz");
                            return m;
                        });
                    }
                    return bean;
                }
    
            };
    
        }
    
        @Bean
        public ApplicationRunner runner(RabbitTemplate template) {
            return args -> template.convertAndSend("foo", "bar");
        }
    
        @RabbitListener(queues = "foo")
        public void listen(String in, @Header("foo") String header) {
            System.out.println(in + header);
        }
    
    }
    

    注意static 修饰符

    编辑

    模板不用于回复;后处理器转而使用容器工厂。

    @SpringBootApplication
    public class So56155062Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So56155062Application.class, args);
        }
    
        @Bean
        public static BeanPostProcessor bpp() {
            return new BeanPostProcessor() {
    
                @Override
                public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
                    if (bean instanceof RabbitTemplate) {
                        ((RabbitTemplate) bean).setBeforePublishPostProcessors(m -> {
                            m.getMessageProperties().setHeader("foo", "baz");
                            m.getMessageProperties().setReplyTo("bar");
                            System.out.println("Adding header to outgoing message with payload: " + new String(m.getBody()));
                            return m;
                        });
                    }
                    else if (bean instanceof AbstractRabbitListenerContainerFactory) {
                        ((AbstractRabbitListenerContainerFactory<?>) bean).setAfterReceivePostProcessors(m -> {
                            m.getMessageProperties().setHeader("qux", "fiz");
                            System.out.println("Adding header to incoming message with payload: " + new String(m.getBody()));
                            return m;
                        });
                        ((AbstractRabbitListenerContainerFactory<?>) bean).setBeforeSendReplyPostProcessors(m -> {
                            m.getMessageProperties().setHeader("foo", "baz");
                            m.getMessageProperties().setReplyTo("bar");
                            System.out.println(
                                    "Adding header to outgoing reply message with payload: " + new String(m.getBody()));
                            return m;
                        });
                    }
                    return bean;
                }
    
            };
    
        }
    
        @Bean
        public ApplicationRunner runner(RabbitTemplate template) {
            return args -> template.convertAndSend("foo", "bar");
        }
    
        @RabbitListener(queues = "foo")
        @SendTo
        public String listen1(String in, @Header("foo") String header) {
            System.out.println(in + header);
            return in.toUpperCase();
        }
    
        @RabbitListener(queues = "bar")
        public void listen2(String in) {
            System.out.println(in);
        }
    
    }
    

    Adding header to outgoing message with payload: bar
    Adding header to incoming message with payload: bar
    barbaz
    Adding header to outgoing reply message with payload: BAR
    Adding header to incoming message with payload: BAR
    BAR
    

    【讨论】:

    • 加里,我试过这个,但当它最初不起作用时,我想我一定错过了一些非常基本的东西。但是,在通过文档不仅针对 spring-amqp 消息后处理,而且针对 spring bean 生命周期之后,我仍然无法让它工作。我添加的其他标题显示正常。只是通过beforePublishafterReceive 后处理器添加的标头没有显示。我使用@RabbitListener 接收和@SendTo 发送是否相关?这些注释是否使用不同的 RabbitTemplate?
    • 我已将BeanPostProcessor 代码添加到我的问题中。
    • 模板不用于回复;我们在侦听器容器通道上发布回复。后处理器继续使用容器工厂 - 请参阅我的答案的编辑。
    • 这很棒!谢谢加里!我通常更喜欢不要弄乱 Spring 的自动配置,因为它可以很好地公开所有这些属性以供以后调整:docs.spring.io/spring-boot/docs/2.1.3.RELEASE/reference/html/…如果它可以检测到我的 MessagePostProcessor bean 并将它们添加到任何兔子模板和容器工厂,那就太好了?
    • 这有点棘手,因为我们需要某种指标(标记接口或方法)来告诉我们它是之前还是之后和/或它是针对消费者和/或生产者的。还有可能应用它们的顺序问题。但是,我想,并非不可能。随意打开一个针对 spring-amqp 的 github 问题,我们来看看。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-12-19
    • 2017-08-15
    • 2022-08-15
    • 1970-01-01
    • 1970-01-01
    • 2012-10-01
    • 2020-05-18
    相关资源
    最近更新 更多