【发布时间】:2016-06-20 14:02:57
【问题描述】:
我为带有 Json 有效负载的 AMQP 入站消息编写了一个简单的流程,类似于
IntegrationFlows
.from(Amqp.inboundGateway(connectionFactory, new Queue("qin"))
.errorChannel(Amqp.channel("dlx", connectionFactory))
)
.handle(new MessageTransformingHandler(m -> {
Object result = null;
try {
result = (...)
} catch (Exception e) {
throw new MessageTransformationException(m, e.getMessage());
}
(...)
}))
.transform(Transformers.toJson(...))
.handle(Amqp.outboundAdapter(new RabbitTemplate(connectionFactory))
.routingKey("qout"))
.get();
}
这完全没问题,除了出现错误!现在我确实在 DLX 中遇到了错误,但在 content_type: application/x-java-serialized-object 中,它必须是 application/json。 p>
我可以通过让错误通道指定 2 个转换器来做到这一点
.amqpMessageConverter(...)
.messageConverter(...)
但问题是我必须自己实现,这并不容易,因为我必须处理将消息转换为 ampqmessage 以及业务对象、连接错误对象和文本等等...
所以我在想如果我不能在错误通道前面有一个适配器,至少负责消息->amqpmessage 转换(希望也是有效负载)。
我也尝试过使用 errorHandler 而不是 errorChannel,但问题是一样的。
有什么建议吗?
提前致谢。
已编辑
非常感谢您的回复。但是我正在努力解决它。经过多次尝试和错误,我终于认为我理解了解决方案(使用“中介”通道,以便我可以在将消息发送到 Amqp 之前处理消息?)但我仍然无法让它工作。我现在有
.errorChannel(MessageChannels.direct("amqpErrorChannel").get())
以及监听该频道的流程
@Bean
public IntegrationFlow errorFlow() {
return IntegrationFlows.from("amqpErrorChannel")
.handle(new MessageTransformingHandler(m ->(...)
但我还是有错误
MessageDeliveryException: Dispatcher 没有频道订阅者 'amqpErrorChannel'。
任何指向我做错了什么的指针?
干杯。
【问题讨论】: