【发布时间】:2020-09-28 19:16:56
【问题描述】:
我将 Spring Cloud Stream 的 DLQ 功能与 Kafka 活页夹一起使用。当消息处理失败时,消息会按预期发送到 DLQ,但是,我希望能够修改发送到 DLQ 的消息以包含一些额外的诊断信息。问题是发送到 DLQ 的消息是 original 消息;我所做的任何突变都会被忽略。到目前为止,我解决这个问题的方法是在消息发送到 DLQ 之前拦截消息,并添加存储在另一个 bean 中的额外信息。具体来说,我尝试了这两种方法:
-
解决方案:为 DLQ 实现一个普通的 Kafka
ProducerInterceptor。 问题:实现是在 Spring 上下文之外实例化的,因此我无法注入我需要的其他 bean。 Spring Kafka 记录了 this solution,但是,它需要创建一个新的ProducerFactorybean,这意味着我不能使用底层 Spring Cloud Stream 中的 bean。 -
解决方案:实现一个 Spring
ChannelInterceptor。 问题:我无法获得对 DLQ 消息通道的引用,也无法获得底层通道名称,因此我无法仅为 DLQ 消息配置拦截器。
关于如何解决这个问题的任何想法?
【问题讨论】:
标签: spring-cloud-stream spring-cloud-stream-binder-kafka