【问题标题】:AMQP Spring Integration error handlingAMQP Spring 集成错误处理
【发布时间】:2015-09-24 17:05:44
【问题描述】:

我的集成流程如下所示:

@Bean
public IntegrationFlow auditFlow(@Qualifier("eventLoggingConnectionFactory") ConnectionFactory connectionFactory,
                                 @Qualifier("writeChannel") MessageChannel messageChannel,
                                 @Qualifier("parseErrorChannel") MessageChannel errorChannel) {
    return IntegrationFlows
            .from(Amqp.inboundAdapter(connectionFactory, auditQueue)
                    .errorChannel(errorChannel)
                    .concurrentConsumers(numConsumers)
                    .messageConverter(new MongoMessageConverter())) // converts JSON to org.bson.Document
            .enrichHeaders(e -> e.<Document>headerFunction(MongoWriteConfiguration.MONGO_COLLECTION_HEADER_KEY,
                    o -> getNamespace(o.getPayload())))
            .channel(messageChannel)
            .get();
}

如果引入了格式错误的消息,消息转换器当然会出错,从而引发 MessageConversionException。在这种情况下,我当然不希望消息重新排队——但我也不想将默认值设置为不重新排队拒绝的消息,就像我可以在 AmqpInboundAdapterSpec 上所做的那样。什么是我不重新排队以这种方式出错的消息(否则出于调试目的重新发布它们)的正确方法?

更一般地说,同一流中的下游进程可能会错误地处理语义上更畸形的数据 - 再说一次,我不想让它们重新排队。那时我可以抛出AmqpRejectAndDontRequeueException,但是我失去了关注点的分离,这是重点的一半。处理这些异常的正确方法是什么——也许有一种方法可以转换为AmqpRejectAndDontRequeueException

【问题讨论】:

    标签: java spring spring-integration amqp spring-amqp


    【解决方案1】:

    入站适配器的SimpleMessageListenerContainerConditionalRejectingErrorHandler)中的默认错误处理程序就是这样做的(它检测到MessageConversionException 并抛出AmqpRejectAndDontRequeueException)。

    您可以通过注入自己的FatalExceptionStrategy 来自定义ConditionalRejectingErrorHandler,以查找其他异常类型并以相同的方式处理它们。

    DefaultExceptionStrategy 看起来像这样...

    @Override
    public boolean isFatal(Throwable t) {
        if (t instanceof ListenerExecutionFailedException
                && t.getCause() instanceof MessageConversionException) {
            if (logger.isWarnEnabled()) {
                logger.warn("Fatal message conversion error; message rejected; "
                        + "it will be dropped or routed to a dead letter exchange, if so configured: "
                        + ((ListenerExecutionFailedException) t).getFailedMessage(), t);
            }
            return true;
        }
        return false;
    }
    

    只有在原因链中还没有 AmqpRejectAndDontRequeueException 时才会调用它。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2015-05-24
      • 2018-07-05
      • 2015-10-03
      • 2020-10-21
      • 2017-12-10
      • 1970-01-01
      相关资源
      最近更新 更多