【问题标题】:Use ImmediateRequeueMessageRecoverer in Spring Integration for AMQP?在 AMQP 的 Spring 集成中使用 ImmediateRequeueMessageRecoverer?
【发布时间】:2021-06-02 08:44:30
【问题描述】:

我们注意到,当 Spring Integration Endpoint(来自 RabbitMQ)收到错误消息时,它们不会重试。如果我们的业务代码(即接收消息的“服务方法”)出现问题而引发异常,则会按预期进行重试。

这是我们的配置:

var myService = ...
IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, queueName)
                .id(integrationFlowId)
                .autoStartup(autoStartup)
                .configureContainer(c -> c.acknowledgeMode(MANUAL)
                        .prefetchCount(10)
                        .concurrentConsumers(1)
                        .maxConcurrentConsumers(3))
                .messageConverter(messageConverter))
                .aggregate(...)
                .handle(myService, "myMethod", e -> e.advice(myAdvice()))
                .get();

myAdvice方法是这样实现的:

ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(200L);
backOffPolicy.setMultiplier(2);
backOffPolicy.setMaxInterval(5000L);

RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy((new SimpleRetryPolicy(MAX_VALUE)));
retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.registerListener(new RetryListenerSupport() {
    @Override
    public <T, E extends Throwable> void onError(RetryContext ctx, RetryCallback<T, E> callback, Throwable e) {
        log.error("Caught {} due to {} (count = {})", e.getClass().getSimpleName(), e.getMessage(), ctx.getRetryCount(), e);
    }
});
StatelessRetryOperationsInterceptorFactoryBean bean = new StatelessRetryOperationsInterceptorFactoryBean();
bean.setRetryOperations(retryTemplate);
bean.setMessageRecoverer(new ImmediateRequeueMessageRecoverer());
return bean.getObject();

问题在于,例如我们发布了一条org.springframework.amqp.support.converter.MessageConverter无法转换为DTO的消息(如{ "yo" : "MTV Raps" }),则消息不会重试:

[my-service-97c696799-6xs26] org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:146)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1436)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1720)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1495)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
[my-service-97c696799-6xs26]    at java.base/java.lang.Thread.run(Thread.java:831)
[my-service-97c696799-6xs26] Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1746)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1636)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
[my-service-97c696799-6xs26]    ... 6 common frames omitted
[my-service-97c696799-6xs26] Caused by: org.springframework.amqp.support.converter.MessageConversionException: Don't know how to convert (Body:'{ "yo" : "MTV Raps" }' MessageProperties [headers={content_type=application/json}, contentType=application/json, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=, receivedRoutingKey=myservice.routingkey, deliveryTag=1, consumerTag=amq.ctag-9De2w0uuQxnve_9k6HZ7tw, consumerQueue=myservice.myqueue]) to an object because no event type was found
[my-service-97c696799-6xs26]    at com.mycompany.RabbitMQEventMessageConverter.fromMessage(RabbitMQEventMessageConverter.java:47)
[my-service-97c696799-6xs26]    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.convertPayload(AmqpInboundChannelAdapter.java:361)
[my-service-97c696799-6xs26]    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createMessageFromAmqp(AmqpInboundChannelAdapter.java:342)
[my-service-97c696799-6xs26]    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:334)
[my-service-97c696799-6xs26]    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:299)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
[my-service-97c696799-6xs26]    ... 10 common frames omitted

似乎没有使用myAdvice()方法中指定的ImmediateRequeueMessageRecoverer,而是使用了默认的AmqpRejectAndDontRequeueException。在我看来,原因很可能是 Spring 基础设施尚未调用 myAdvice() 方法。我已经尝试找到一种方法来切换 configureContainer 中的消息恢复器,但我似乎找不到这样做的方法。

有谁知道我如何重新排队/重试失败的消息之前 spring 集成调用“服务方法”?

我们正在使用 Spring Integration 5.4.6 和 Spring Boot 2.4.6。

【问题讨论】:

    标签: java spring spring-boot spring-integration spring-amqp


    【解决方案1】:

    在创建消息之前执行转换。

    转换错误通常被认为是致命的 - 重试没有意义,因为它会再次失败。

    向入站适配器添加.errorChannel;其下游流将收到 ErrorMessage 以表示转换错误。

    但是,它也会从下游流中获取错误消息,因此您必须在那里处理所有错误类型。

    编辑

    您可以添加错误通道并在其流程中处理转换异常。但请记住,消息将一次又一次地重新传递,不会有任何延迟。

    @SpringBootApplication
    public class So67801807Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So67801807Application.class, args);
        }
    
        @Bean
        IntegrationFlow flow(ConnectionFactory cf) {
            return IntegrationFlows.from(Amqp.inboundAdapter(cf, "foo")
                        .messageConverter(new MC())
                        .errorChannel("errors"))
                    .handle(...)
                    .get();
        }
    
        @Bean
        IntegrationFlow errorFlow() {
            return IntegrationFlows.from("errors")
                    .handle(msg -> {
                        if (((ErrorMessage) msg).getPayload().getCause() instanceof MessageConversionException) {
                            throw new ImmediateRequeueAmqpException("Requeuing due to conversion");
                        }
                        else {
                            // handle some other exception thrown by the downstream flow
                        }
                    })
                    .get();
        }
    
    }
    
    class MC implements MessageConverter {
    
        @Override
        public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
            return null;
        }
    
        @Override
        public Object fromMessage(Message message) throws MessageConversionException {
            throw new MessageConversionException("test");
        }
    
    }
    

    或者您可以向容器添加自定义错误处理程序。默认错误处理程序认为转换异常是致命的。

    https://docs.spring.io/spring-amqp/docs/current/reference/html/#exception-handling

    【讨论】:

    • 只是为了让我理解正确,有没有办法只重新排队消息?您是对的,我们现在无法处理此消息,但对我们来说,当一个系统 (A) 发布 (B) 感兴趣的消息的更新版本时会发生此错误,但需要先更新它才能处理正确阅读。 IE。我们忘记在 A 发布更新的消息之前更新 B。然后我们要做的是让 B 重新排队消息,以便我们可以更新它以支持新消息,然后重新部署。然后可以再次正确处理消息。我们不想丢失味精!
    • 当没有.errorChannel 时,MessageConversionException 被扔到容器中;容器的默认ErrorHandlerConditionalRejectingErrorHandler)将转换异常视为致命并抛出AmqpRejectAndDontRequeue 异常。您可以通过配置FatalExceptionStrategy 来更改容器的ErrorHandler,也可以向错误通道流添加逻辑。我将编辑我的答案以显示后者的示例。但请记住,在问题得到解决之前,该消息将不断重新传递。
    猜你喜欢
    • 2020-10-21
    • 2016-11-23
    • 1970-01-01
    • 2015-12-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多