【发布时间】:2022-01-12 12:13:07
【问题描述】:
我需要记录到达不兼容 Avro 架构的 Kafka Consumer 的消息。
我怎样才能做到这一点?
我想默认情况下,Avro 不兼容的消息永远不会到达我的 Kafka 消费者代码。我使用 Spring 的@KafkaListener。
【问题讨论】:
标签: apache-kafka kafka-consumer-api avro spring-kafka
我需要记录到达不兼容 Avro 架构的 Kafka Consumer 的消息。
我怎样才能做到这一点?
我想默认情况下,Avro 不兼容的消息永远不会到达我的 Kafka 消费者代码。我使用 Spring 的@KafkaListener。
【问题讨论】:
标签: apache-kafka kafka-consumer-api avro spring-kafka
假设您使用的是 Confluent Schema Registry,生产者将无法发送不兼容的记录,因为 Registry 将执行兼容性检查并返回异常以停止生产者。这些记录不会到达消费者,正确,但不是来自您设置的任何 Spring / 消费者属性。
除此之外,任何未配置为使用模式注册表(或错误注册表)的生产者的毒丸处理都需要通过反序列化处理程序进行设置。在此处参考答案中的属性 - How to configure Kafka consumer retry property from application.properties in spring boot?
【讨论】:
使用ErrorHandlingDeserializer - 反序列化失败的记录直接发送到容器的错误处理程序。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handling-deserializer
【讨论】:
>without overwriting the default error handler? 你不能。您必须使用自定义恢复器配置您自己的错误处理程序(以处理失败的记录)。对于 2.8 及更高版本,配置 DefaultErrorHandler (docs.spring.io/spring-kafka/docs/current/reference/html/…)。对于早期版本,配置一个SeekToCurrentErrorHandler(docs.spring.io/spring-kafka/docs/2.7.x/reference/html/…);无需扩展类,根据需要配置即可。