【问题标题】:Taking only specific message from @RabbitListener仅从 @RabbitListener 获取特定消息
【发布时间】:2019-11-02 21:32:22
【问题描述】:

我有向 RabbitMQ 发送消息的旧系统。 系统只使用一个队列:q.finance.invoice,但它有两种类型的消息,其中消息类型在标题中可用。

第一种

Type : invoice.created
{
  "field_1" : "",
  "field_2" : "",
}

第二种

Type : invoice.paid

{
  "field_5" : "",
  "field_6" : "",
}

所以现在我的消费者需要根据数据类型有选择地处理消息。 Spring 有@RabbitHandler 可以做到这一点......如果消息是由 spring 发布的。 我不能使用 @RabbitHandler 注释。 我认为这是因为 @RabbitHandler 正在转换基于旧系统中不存在的 __TypeId__ 标头的消息。

如何模拟这种@RabbitHandler 行为(根据其类型获取数据)?

所以我使用@RabbitListener 来消费消息。 但是@RabbitListener 正在接收所有类型的消息。 我们使用@RabbitListener 的另一个原因是因为我们的错误处理程序依赖于MessageChannel 我们拥有的基本方法签名是这样的:

    @RabbitListener(queues = "q.finance.invoice")
    public void listenInvoicePaid(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
   // convert message body JSON string to object
   // process it
}

我正在尝试根据类型进行手动拒绝,这很有效。但是当我有很多听众或队列时,我确信它是不可扩展的

import java.io.IOException;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;

import com.rabbitmq.client.Channel;

@Service
public class InvoiceListenerOnMethod {

    private static final Logger log = LoggerFactory.getLogger(InvoiceListenerOnMethod.class);

    @RabbitListener(queues = "q.finance.invoice")
    public void listenInvoiceCreated(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
            throws IOException {
        if (!StringUtils.equalsIgnoreCase("invoice.created", message.getMessageProperties().getType())) {
            log.warn("[on Method] Rejecting invoice created : {}", message);
            channel.basicReject(tag, true);
            return;
        }

        log.info("[on Method] Listening invoice created : {}", message);
    }

    @RabbitListener(queues = "q.finance.invoice")
    public void listenInvoicePaid(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
            throws IOException {
        if (!StringUtils.equalsIgnoreCase("invoice.paid", message.getMessageProperties().getType())) {
            log.warn("[on Method] Rejecting invoice paid : {}", message);
            channel.basicReject(tag, true);
            return;
        }

        log.info("[on Method] Listening invoice paid : {}", message);
    }

}

看,如果我有 4 条消息(paid-paid-created-created),监听器可以运行超过 4 次,因为我们无法控制谁将接收哪条消息。所以listenInvoicePaid()可以是这样的

  • 拒绝()
  • 拒绝()
  • 确认()
  • 拒绝()
  • 确认()

同样的方式,在 ack() 之前多次拒绝() 也可以在 listenInvoiceCreated()
中发生 因此,在正确处理所有消息之前,我总共可以调用大约 10 条消息。

有什么修复代码的建议吗?

【问题讨论】:

    标签: spring-boot spring-amqp spring-rabbit


    【解决方案1】:

    可能的实施

    这是天真的 if-else 方法,谢谢 Mark。这是您的建议(第一种选择)。 至于第二种选择,我不能这样做,因为发布者是我没有代码的遗留系统

        @RabbitListener(queues = "q.finance.invoice")
        public void listenInvoiceCreated(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long tag,
                @Header("type") String type) throws IOException {
            if (StringUtils.equalsIgnoreCase(type, "invoice.paid")) {
                log.info("Delegate to invoice paid handler");
            } else if (StringUtils.equalsIgnoreCase(type, "invoice.created")) {
                log.info("Delegate to invoice created handler");
            } else {
                log.info("Delegate to default handler");
            }
        }
    

    第二个实施方案
    这是我实现的,感谢 Gary。我认为这是更清洁的方法。接下来我只需要将消息后处理器提取到其他类以进行维护,所以我不会弄乱我的@RabbitListener

    配置文件

    import java.util.Optional;
    
    import org.apache.commons.lang3.StringUtils;
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessagePostProcessor;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import com.course.finance.message.invoice.InvoiceCreatedMessage;
    import com.course.finance.message.invoice.InvoicePaidMessage;
    
    @Configuration
    public class RabbitmqConfig {
    
        @Bean(name = "rabbitListenerContainerFactory")
        public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
                SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            configurer.configure(factory, connectionFactory);
    
            factory.setAfterReceivePostProcessors(new MessagePostProcessor() {
    
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    var type = message.getMessageProperties().getHeaders().get("type").toString();
                    String typeId = null;
    
                    if (StringUtils.equalsIgnoreCase(type, "invoice.paid")) {
                        typeId = InvoicePaidMessage.class.getName();
                    } else if (StringUtils.equalsIgnoreCase(type, "invoice.created")) {
                        typeId = InvoiceCreatedMessage.class.getName();
                    }
    
                    Optional.ofNullable(typeId).ifPresent(t -> message.getMessageProperties().setHeader("__TypeId__", t));
    
                    return message;
                }
    
            });
    
            return factory;
        }
    
        @Bean
        Jackson2JsonMessageConverter jsonMessageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    
        @Bean
        RabbitTemplate rabbitTemplate(Jackson2JsonMessageConverter converter, ConnectionFactory connectionFactory) {
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            template.setMessageConverter(new Jackson2JsonMessageConverter());
            return template;
        }
    
    }
    

    听众

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;
    
    import com.course.finance.message.invoice.InvoiceCreatedMessage;
    import com.course.finance.message.invoice.InvoicePaidMessage;
    
    @Service
    @RabbitListener(queues = "q.finance.invoice")
    public class InvoiceListener {
    
        private static final Logger log = LoggerFactory.getLogger(InvoiceListener.class);
    
        @RabbitHandler
        public void listenInvoiceCreated(InvoiceCreatedMessage message) {
            log.info("Listening invoice created : {}", message);
        }
    
        @RabbitHandler
        public void listenInvoicePaid(InvoicePaidMessage message) {
            log.info("Listening invoice paid : {}", message);
        }
    
        @RabbitHandler(isDefault = true)
        public void listenDefault(Message message) {
            log.info("Default invoice listener : {}", message.getMessageProperties().getHeaders());
        }
    
    }
    

    【讨论】:

      【解决方案2】:

      您可以将MessagePostProcessor 添加到容器工厂的afterReceiveMessagePostProcessor 属性中。在后处理器中,您可以检查 JSON body() 并将 __TypeId__ 标头设置为适当的类名。

      有关示例,请参阅 this answer

      【讨论】:

        【解决方案3】:

        我没有使用过 rabbit 的 spring 集成,但总而言之,拥有一个处理不同消息类型的单个队列的想法听起来有点问题:

        许多消费者可能会收到他们无法处理的类型的消息并不得不拒绝它们,这样消息就会返回到rabbit,然后一次又一次......所有集群的性能都会恶化因为这个。

        所以我认为你可以走两条路:

        • 实现可以处理两种类型消息的单个侦听器。无需更改 Rabbit,但在 java 方面可能是一个具有挑战性的重构。

        • 幸运的是,Rabbit MQ 在路由消息方面非常灵活。配置交换以根据路由键、任何标题将类型 A 的消息路由到队列 A 和类型 B 的消息路由到队列 B,Rabbit 中有不同类型的交换,您肯定会找到最适合您的配置.

        我个人会选择第二条路。

        【讨论】:

        • 嗯,这是一个遗留应用程序,我没有源代码,因此无法进行交换和路由。但我认为第一个想法应该可行,它只是使用 if-else 的某种幼稚代码
        • 至于第二个想法,基本上你不需要遗留应用的源代码,你可以在rabbit mq服务器本身内部进行路由(使用它的UI或命令行)......
        猜你喜欢
        • 1970-01-01
        • 2020-06-29
        • 1970-01-01
        • 1970-01-01
        • 2016-05-10
        • 2012-10-22
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多