【问题标题】:Spring Cloud Stream - modify DLQ messagesSpring Cloud Stream - 修改 DLQ 消息
【发布时间】:2020-09-28 19:16:56
【问题描述】:

我将 Spring Cloud Stream 的 DLQ 功能与 Kafka 活页夹一起使用。当消息处理失败时,消息会按预期发送到 DLQ,但是,我希望能够修改发送到 DLQ 的消息以包含一些额外的诊断信息。问题是发送到 DLQ 的消息是 original 消息;我所做的任何突变都会被忽略。到目前为止,我解决这个问题的方法是在消息发送到 DLQ 之前拦截消息,并添加存储在另一个 bean 中的额外信息。具体来说,我尝试了这两种方法:

  1. 解决方案:为 DLQ 实现一个普通的 Kafka ProducerInterceptor问题:实现是在 Spring 上下文之外实例化的,因此我无法注入我需要的其他 bean。 Spring Kafka 记录了 this solution,但是,它需要创建一个新的 ProducerFactory bean,这意味着我不能使用底层 Spring Cloud Stream 中的 bean。
  2. 解决方案:实现一个 Spring ChannelInterceptor问题:我无法获得对 DLQ 消息通道的引用,也无法获得底层通道名称,因此我无法仅为 DLQ 消息配置拦截器。

关于如何解决这个问题的任何想法?

【问题讨论】:

    标签: spring-cloud-stream spring-cloud-stream-binder-kafka


    【解决方案1】:

    问题:我无法获得对 DLQ 消息通道的引用,也无法获得底层通道名称,因此我无法仅为 DLQ 消息配置拦截器。

    错误通道名称是

    destinationName.group.errors
    

    您可以从应用程序上下文中将其作为AbstractMessageChannel 获取...

    context.getBean("destinationName.group.errors", AbstractMessageChannel.class)
    

    ...并添加拦截器。

    或者,根本不使用 binder 的 DLQ 机制并添加您自己的错误处理程序:

    @ServiceActivator(inputChannel = "destinationName.group.errors")
    void errors(Message<?> error) {
        ...
    }
    

    【讨论】:

    • 感谢您的信息。我正在使用自定义 DLQ 主题名称;我会看看这是否仍然有效。使用自定义 DLQ 机制会很棒,但该项目已经依赖于 Spring,所以我坚持这样做。
    • 嗯,无法通过该示例获得对 DLQ 频道的引用。上下文中唯一的AbstractMessageChannelerrorChannel,即接收MessagingException 消息,而不是出站DLQ 消息。我还检查了MessageChannel bean 的上下文,我也没有看到它们。
    • 必须等到绑定注册完毕;你不能只是自动接线;您可以添加一个SmartLifecycle @Bean 将它放在比创建绑定的阶段(即Integer.MAX_VALUE - 1000)更晚(更高)的“阶段”中。 autoStartup 必须是 true 以便 Spring 调用 start() 方法(这是您应该放置代码的地方)。
    猜你喜欢
    • 2019-04-19
    • 2018-12-29
    • 2021-06-12
    • 2018-12-17
    • 2019-04-17
    • 2020-12-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多