【问题标题】:How to manually acknowledge RabbitMQ messages in Spring Cloud Stream?如何在 Spring Cloud Stream 中手动确认 RabbitMQ 消息?
【发布时间】:2018-06-20 05:31:59
【问题描述】:

对于基于流的服务,当@StreamListener 中调用的基础服务失败时,我希望消息保留在队列中。为此,我的理解是,这样做的唯一方法是配置spring.cloud.stream.bindings.channel_name.consumer.acknowledge-mode=MANUAL

进行此配置更改后,我尝试将@Header(AmqpHeaders.CHANNEL) Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag 作为方法参数添加到我现有的@StreamListener 实现中,如https://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-inbound-ack 中所述。有了这段代码,我遇到了以下异常:

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:941)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:851)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:771)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:102)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:198)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1311)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:752)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1254)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1224)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:102)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1470)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.messaging.MessageHandlingException: Missing header 'amqp_channel' for method parameter type [interface com.rabbitmq.client.Channel]
    at org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver.handleMissingValue(HeaderMethodArgumentResolver.java:100)
    at org.springframework.messaging.handler.annotation.support.AbstractNamedValueMethodArgumentResolver.resolveArgument(AbstractNamedValueMethodArgumentResolver.java:103)
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112)

然后我发现了以下内容:https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_usage_examples,它显示了如何使用 Kafka 执行消息确认的示例,但我目前使用的是 RabbitMQ 绑定。我们计划最终迁移到 Kafka,但是现在,我如何配置和编写解决方案来为成功处理的消息进行手动消息确认和手动消息拒绝,从而在遇到异常时将消息留在队列中。我目前在 Spring Cloud Edgware.RELEASE 和 Spring Cloud Stream Ditmars.RELEASE

更新

现在我有以下配置:

spring:
  cloud:
    stream:
      bindings:
        do-something-async-reply:
          group: xyz-service-do-something-async-reply
      rabbit:
        bindings:
          do-something-async-reply:
            consumer:
              autoBindDlq: true
              dlqDeadLetterExchange:
              dlqTtl: 10000
              requeueRejected: true

我在服务启动时收到以下错误:

2018-01-12 14:46:34.346 ERROR [xyz-service,,,] 2488 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'do-something-async-reply.xyz-service-do-something-async-reply' in vhost '/': received the value 'DLX' of type 'longstr' but current is none, class-id=50, method-id=10)

什么配置错误/我错过了什么?

【问题讨论】:

    标签: spring-integration spring-cloud-stream spring-integration-amqp


    【解决方案1】:

    属性名称不正确;你错过了.rabbit。这是

    spring.cloud.stream.rabbit.bindings.&lt;channel&gt;.consumer.acknowledge-mode=MANUAL

    因为这是兔子特有的属性 - 请参阅 the documentation

    编辑

    例子:

    @SpringBootApplication
    @EnableBinding(Sink.class)
    public class So481977082Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So481977082Application.class, args);
        }
    
        @StreamListener(Sink.INPUT)
        public void in(String in, @Header(AmqpHeaders.CHANNEL) Channel channel,
                @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
            System.out.println(in);
            Thread.sleep(60_000);
            channel.basicAck(tag, false);
            System.out.println("Ackd");
        }
    
    }
    

    请记住,对 MANUAL ack 的需求通常是一种气味;通常最好让容器处理ack;在同一个文档链接中查看requeueRejected。无条件重新排队会导致无限循环。

    EDIT2

    对我来说很好......

    @SpringBootApplication
    @EnableBinding(Processor.class)
    public class So48197708Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So48197708Application.class, args);
        }
    
        @Bean
        ApplicationRunner runner(MessageChannel output) {
            return args -> {
                output.send(new GenericMessage<>("foo"));
            };
        }
    
        @StreamListener(Sink.INPUT)
        public void listen(@Header(name = "x-death", required = false) List<?> death) {
            System.out.println(death);
            throw new RuntimeException("x");
        }
    
    }
    

    spring:
      cloud:
        stream:
          bindings:
            input:
              group: foo
              content-type: application/json
              destination: foo
              consumer:
                max-attempts: 1
            output:
              content-type: application/json
              destination: foo
          rabbit:
            bindings:
              input:
                consumer:
                  auto-bind-dlq: true
                  dlqDeadLetterExchange:
                  dlqTtl: 10000
    

    结果:

    null
    ...
    Caused by: java.lang.RuntimeException: x
    ...
    [{reason=expired, count=1, exchange=DLX, routing-keys=[foo.foo], time=Fri Jan 12 17:20:28 EST 2018, queue=foo.foo.dlq}, 
        {reason=rejected, count=1, exchange=foo, time=Fri Jan 12 17:20:18 EST 2018, routing-keys=[foo], queue=foo.foo}]
    ...
    
    ...
    [{reason=expired, count=3, exchange=DLX, routing-keys=[foo.foo], time=Fri Jan 12 17:20:28 EST 2018, queue=foo.foo.dlq}, 
        {reason=rejected, count=3, exchange=foo, routing-keys=[foo], time=Fri Jan 12 17:20:18 EST 2018, queue=foo.foo}]
    

    【讨论】:

    • Gary,将 requeueRejected 设置为 true,当@StreamListener 抛出异常时,消息是否会自动重新排队?这就是所有需要做的事情吗?
    • 好的,我在docs.spring.io/spring-cloud-stream/docs/Ditmars.SR2/reference/… 找到了一些相关信息。当文档声明“将 dlqDeadLetterExchange 设置为默认交换”时,默认交换的值是多少?
    • AmqpRejectAndDontRequeueException 之外的任何异常都会导致消息重新排队,只要禁用重试即可。如果启用了重试,则当重试用尽时,绑定器将抛出该异常。还有republishToDlq,活页夹将在其中发布故障,包括包含有关故障信息的标头。默认交换是一个空字符串。所有队列都以它们的队列名称作为路由键绑定到该交换。
    • Putting it all together 所示,您只需为dlqDeadLetterExchange 使用空白值而不是''。
    • 查看我的第二次编辑以获取工作示例。由于您的队列已经存在,您必须将其删除;您不能更改现有队列的参数。
    猜你喜欢
    • 2020-07-26
    • 1970-01-01
    • 2016-12-27
    • 2021-06-03
    • 2015-06-21
    • 2017-12-31
    • 2017-10-02
    • 1970-01-01
    • 2018-01-24
    相关资源
    最近更新 更多