【问题标题】:How can I log kafka messages that are not compatible to avro schema in Kafka Consumer?如何在 Kafka Consumer 中记录与 avro 模式不兼容的 kafka 消息?
【发布时间】:2022-01-12 12:13:07
【问题描述】:

我需要记录到达不兼容 Avro 架构的 Kafka Consumer 的消息。

我怎样才能做到这一点?

我想默认情况下,Avro 不兼容的消息永远不会到达我的 Kafka 消费者代码。我使用 Spring 的@KafkaListener

【问题讨论】:

    标签: apache-kafka kafka-consumer-api avro spring-kafka


    【解决方案1】:

    假设您使用的是 Confluent Schema Registry,生产者将无法发送不兼容的记录,因为 Registry 将执行兼容性检查并返回异常以停止生产者。这些记录不会到达消费者,正确,但不是来自您设置的任何 Spring / 消费者属性。

    除此之外,任何未配置为使用模式注册表(或错误注册表)的生产者的毒丸处理都需要通过反序列化处理程序进行设置。在此处参考答案中的属性 - How to configure Kafka consumer retry property from application.properties in spring boot?

    【讨论】:

      【解决方案2】:

      使用ErrorHandlingDeserializer - 反序列化失败的记录直接发送到容器的错误处理程序。

      https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handling-deserializer

      【讨论】:

      • 感谢您的反馈。你知道我如何提供一个 errorHandler 来处理 ErrorHandlingDeserializer 生成的这个错误(例如,用于持久化到数据库)但不覆盖默认的错误处理程序吗?例如在将它传递给 factory.setErrorHandler() 方法之前我应该​​扩展哪个类?
      • >without overwriting the default error handler? 你不能。您必须使用自定义恢复器配置您自己的错误处理程序(以处理失败的记录)。对于 2.8 及更高版本,配置 DefaultErrorHandler (docs.spring.io/spring-kafka/docs/current/reference/html/…)。对于早期版本,配置一个SeekToCurrentErrorHandlerdocs.spring.io/spring-kafka/docs/2.7.x/reference/html/…);无需扩展类,根据需要配置即可。
      猜你喜欢
      • 2021-12-28
      • 1970-01-01
      • 2020-12-13
      • 2023-03-09
      • 2016-02-22
      • 1970-01-01
      • 2019-12-06
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多