【问题标题】:Publish messages that could not be de-serialized to DLT topic将无法反序列化的消息发布到 DLT 主题
【发布时间】:2020-02-18 15:32:54
【问题描述】:

我不明白如何使用 spring kafka 将无法反序列化的消息写入 DLT 主题。

我根据spring kafka docs配置了消费者,这对于反序列化消息后发生的异常非常有效。

但是当消息不可反序列化时,在轮询消息时会抛出org.apache.kafka.common.errors.SerializationException

随后,SeekToCurrentErrorHandler.handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, ...) 被调用时出现此异常,但记录列表为空,因此无法向 DLT 主题写入内容。

如何将这些消息也写入 DLT 主题?

【问题讨论】:

    标签: apache-kafka spring-kafka


    【解决方案1】:

    问题在于异常是由 Kafka 客户端本身引发的,因此 Spring 无法看到失败的实际记录。

    这就是我们添加ErrorHandlingDeserializer2 的原因,它可以用来包装实际的反序列化器;失败被传递给侦听器容器并作为DeserializationException 重新抛出。

    the documentation

    当反序列化器无法反序列化消息时,Spring 无法处理该问题,因为它发生在 poll() 返回之前。为了解决这个问题,2.2 版本引入了 ErrorHandlingDeserializer2。这个反序列化器委托给一个真正的反序列化器(键或值)。如果委托未能反序列化记录内容,则 ErrorHandlingDeserializer2 在包含原因和原始字节的标头中返回空值和 DeserializationException。当您使用记录级 MessageListener 时,如果 ConsumerRecord 包含键或值的 DeserializationException 标头,则使用失败的 ConsumerRecord 调用容器的 ErrorHandler。记录不会传递给监听器。

    DeadLetterPublishingRecoverer 具有检测异常并发布失败记录的逻辑。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-11-25
      • 1970-01-01
      • 1970-01-01
      • 2016-10-22
      • 1970-01-01
      • 2019-11-27
      • 2019-08-15
      • 2017-09-22
      相关资源
      最近更新 更多