【发布时间】:2015-09-24 17:05:44
【问题描述】:
我的集成流程如下所示:
@Bean
public IntegrationFlow auditFlow(@Qualifier("eventLoggingConnectionFactory") ConnectionFactory connectionFactory,
@Qualifier("writeChannel") MessageChannel messageChannel,
@Qualifier("parseErrorChannel") MessageChannel errorChannel) {
return IntegrationFlows
.from(Amqp.inboundAdapter(connectionFactory, auditQueue)
.errorChannel(errorChannel)
.concurrentConsumers(numConsumers)
.messageConverter(new MongoMessageConverter())) // converts JSON to org.bson.Document
.enrichHeaders(e -> e.<Document>headerFunction(MongoWriteConfiguration.MONGO_COLLECTION_HEADER_KEY,
o -> getNamespace(o.getPayload())))
.channel(messageChannel)
.get();
}
如果引入了格式错误的消息,消息转换器当然会出错,从而引发 MessageConversionException。在这种情况下,我当然不希望消息重新排队——但我也不想将默认值设置为不重新排队拒绝的消息,就像我可以在 AmqpInboundAdapterSpec 上所做的那样。什么是我不重新排队以这种方式出错的消息(否则出于调试目的重新发布它们)的正确方法?
更一般地说,同一流中的下游进程可能会错误地处理语义上更畸形的数据 - 再说一次,我不想让它们重新排队。那时我可以抛出AmqpRejectAndDontRequeueException,但是我失去了关注点的分离,这是重点的一半。处理这些异常的正确方法是什么——也许有一种方法可以转换为AmqpRejectAndDontRequeueException?
【问题讨论】:
标签: java spring spring-integration amqp spring-amqp