【问题标题】:Spring RabbitMQ - using manual channel acknowledgement on a service with @RabbitListener configurationSpring RabbitMQ - 在具有 @RabbitListener 配置的服务上使用手动通道确认
【发布时间】:2016-12-08 07:03:35
【问题描述】:

如何在不使用自动确认的情况下手动确认消息。 有没有办法将它与@RabbitListener@EnableRabbit 配置样式一起使用。 大多数文档告诉我们使用SimpleMessageListenerContainerChannelAwareMessageListener。 然而,使用它我们失去了注释提供的灵活性。 我的服务配置如下:

@Service
public class EventReceiver {

@Autowired
private MessageSender messageSender;

@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order) throws Exception {

  // code for processing order
}

我的 RabbitConfiguration 如下

@EnableRabbit
public class RabbitApplication implements RabbitListenerConfigurer {

public static void main(String[] args) {
    SpringApplication.run(RabbitApplication.class, args);
}

@Bean


public MappingJackson2MessageConverter jackson2Converter() {
        MappingJackson2MessageConverter converter = new  MappingJackson2MessageConverter();
        return converter;
    @Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
      SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
      factory.setConnectionFactory(rabbitConnectionFactory());
      factory.setMaxConcurrentConsumers(5);
      factory.setMessageConverter((MessageConverter) jackson2Converter());
      factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
      return factory;
    }

@Bean
public ConnectionFactory rabbitConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost("localhost");
    return connectionFactory;
}

@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
    registrar.setContainerFactory(myRabbitListenerContainerFactory());
}

@Autowired
private EventReceiver receiver;
}
}

对于如何适应手动通道确认以及上述配置样式的任何帮助,我们将不胜感激。 如果我们实现 ChannelAwareMessageListener,那么 onMessage 签名将会改变。 我们可以在服务上实现 ChannelAwareMessageListener 吗?

【问题讨论】:

  • 一个问题是为什么你甚至需要这样做。如果您的代码与下面的答案类似(拒绝失败,否则确认),容器将使用 AUTO ack 模式自动为您执行此操作 - 如果侦听器抛出异常,则消息将被拒绝;否则确认。

标签: java rabbitmq spring-amqp spring-rabbit


【解决方案1】:

Channel 添加到@RabbitListener 方法...

@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order, Channel channel,
    @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
    ...
}

并使用basicAckbasicReject中的标签。

编辑

@SpringBootApplication
@EnableRabbit
public class So38728668Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So38728668Application.class, args);
        context.getBean(RabbitTemplate.class).convertAndSend("", "so38728668", "foo");
        context.getBean(Listener.class).latch.await(60, TimeUnit.SECONDS);
        context.close();
    }

    @Bean
    public Queue so38728668() {
        return new Queue("so38728668");
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }

    public static class Listener {

        private final CountDownLatch latch = new CountDownLatch(1);

        @RabbitListener(queues = "so38728668")
        public void receive(String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
                throws IOException {
            System.out.println(payload);
            channel.basicAck(tag, false);
            latch.countDown();
        }

    }

}

application.properties:

spring.rabbitmq.listener.acknowledge-mode=manual

【讨论】:

  • 感谢您的建议,我们尝试了您的建议并输入channel.basicAck('100001', false)。现在无论我在上面的代码行中输入“true”还是“false”,侦听器和队列都会进入无限循环。那么你能帮我解决这个问题吗?
  • 我们终于解决了这个问题。为了他人的利益,我正在记录这一点。
  • 我收到一个错误,即 spring.rabbitmq.listener.acknowledge-mode 是一个已弃用的属性。我最终在我的 RabbitListenerContainerFactory Bean 上设置了这个属性并让它以这种方式工作。
  • 该属性为moved to spring.rabbitmq.listener.simple.acknowledge-mode。在 Spring Boot 2.0 中,它可以是 spring.rabbitmq.listener.simple.acknowledge-modespring.rabbitmq.listener.direct.acknowledge-mode,因为 Spring AMQP 现在支持 2 种容器类型。见the documentation
  • 有人可以向我解释为什么 java 程序员拒绝将导入放在他们的代码示例中吗?我觉得这可以节省我的时间。
【解决方案2】:

以防万一您需要使用 ChannelAwareMessageListener 类中的#onMessage()。那你就可以这样了。

@Component
public class MyMessageListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) {
        log.info("Message received.");
        // do something with the message
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

对于rabbit的配置

@Configuration
public class RabbitConfig {

    public static final String topicExchangeName = "exchange1";

    public static final String queueName = "queue1";

    public static final String routingKey = "queue1.route.#";

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("xxxx");
        connectionFactory.setPassword("xxxxxxxxxx");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("vHost1");
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory());
    }

    @Bean
    Queue queue() {
        return new Queue(queueName, true);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(topicExchangeName);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(routingKey);
    }


    @Bean
    public SimpleMessageListenerContainer listenerContainer(MyMessageListener myRabbitMessageListener) {
        SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
        listenerContainer.setConnectionFactory(connectionFactory());
        listenerContainer.setQueueNames(queueName);
        listenerContainer.setMessageListener(myRabbitMessageListener);
        listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        listenerContainer.setConcurrency("4");
        listenerContainer.setPrefetchCount(20);
        return listenerContainer;
    }
}

【讨论】:

  • 嗨,Pari,这里的 POCRabbitMessageListener 是什么?
  • 我不太明白你的问题,请你改一下
  • @java1977 POCRabbitMessageListener 是 MyMessageListener。我的坏我没有注意到它。已编辑,谢谢。
【解决方案3】:

感谢加里的帮助。我终于解决了这个问题。为了他人的利益,我正在记录这一点。 这需要作为标准文档的一部分记录在 Spring AMQP 参考文档页面中。 服务等级如下。

   @Service
    public class Consumer {

    @RabbitListener(queues = "${eventqueue}")
    public void receiveMessage(Order order, Channel channel) throws Exception {



 // the above methodname can be anything but should have channel as second signature

    channel.basicConsume(eventQueue, false, channel.getDefaultConsumer()); 
    // Get the delivery tag
    long deliveryTag = channel.basicGet(eventQueue, false).getEnvelope().getDeliveryTag();
    try {

      // code for processing order

    catch(Exception) {
     // handle exception
        channel.basicReject(deliveryTag, true);
    }
    // If all logic is successful 
    channel.basicAck(deliveryTag, false);
}

配置也做了如下修改

public class RabbitApplication implements RabbitListenerConfigurer {

    private static final Logger log = LoggerFactory.getLogger(RabbitApplication .class);

    public static void main(String[] args) {
        SpringApplication.run(RabbitApplication.class, args);
    }

    @Bean
    public MappingJackson2MessageConverter jackson2Converter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        return converter;
    }

    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(jackson2Converter());
        return factory;
    }

    @Autowired
    private Consumer consumer;

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
    }

      ...
}

注意:不需要配置 Rabbitconnectionfactory 或 containerfactor 等,因为注解会隐式处理所有这些。

【讨论】:

  • 不 - 你不应该对频道发出 basicConsumebasicGet - basicGet 将获取另一条消息。侦听器容器已经在使用它,并且用于调用该方法的消息具有不同的传递标记。相反,请使用@Header(AmqpHeaders.DELIVERY_TAG) long tag。请参阅我的答案(编辑)。
  • Gary,删除了 basicConsume 和 basicGet 并在 basicAck/basicReject 中使用了@Header(AmqpHeaders.DELIVERY_TAG) 长标签 流程停止工作。在无限循环中一次又一次地获取队列,并填充目标队列。将代码恢复为 basicGet 和 basicConsume 并且它正在工作。
  • 但它不起作用 - 您正在确认(并丢弃)下一条消息。
  • 所以你的意思是我应该使用某种消耗来避免它。不清楚,因为您之前说过我们不应该使用 basicConsume。
  • 我刚刚编写了一个快速的 Spring Boot 应用程序,它的工作方式与我描述的完全一样——我用代码编辑了我的答案。完整的项目是herecommit。如果您在basicAck 上设置断点,您可以在 Rabbit Admin UI 中看到未确认的消息;走过去,消息被确认。
猜你喜欢
  • 2017-12-22
  • 2016-12-27
  • 2017-12-31
  • 2023-03-24
  • 1970-01-01
  • 2018-06-20
  • 1970-01-01
  • 2015-06-21
  • 1970-01-01
相关资源
最近更新 更多