【发布时间】:2019-07-14 15:11:07
【问题描述】:
我无法升级到 Spring 5,所以我只能使用 spring-kafka 1.3 及其有限的错误处理。所以我无法访问 spring-kafka 2 中的 ConsumerAwareErrorHandler 或 SeekToCurrent 错误处理程序。
我正在使用@KafkaListener-annotated 方法来收听一个主题,我已将io.confluent.kafka.serializers.KafkaAvroDeserializer 配置为我的值反序列化器。
问题是,如果我在主题中得到一条不是 Avro 格式的消息,那么 KafkaMessageListenerContainer 轮询循环就会卡住。反序列化程序会在消息上引发异常,并且轮询循环永远不会越过它,因此下次通过循环时,它会尝试反序列化相同的消息并继续循环,每秒将相同的错误转储到我的日志中数千次。
似乎没有办法获得 NeverRetryPolicy 或边缘方面的任何东西,但我可以factory.getContainerProperties().setErrorHandler()。不幸的是,我不确定我能从那里做什么。
有什么东西可以自动装配到我的错误处理程序中,我可以用它来寻找错误时的前向 1 个偏移量吗?不确定那是什么,文档并没有过多地谈论您可以使用 ErrorHandler 实际做什么,我能找到的大多数示例都是针对 spring-kafka 2.X。就像,它不会反序列化,我对消息无能为力,它永远不会起作用,我想避免再次重试它,似乎大多数 Stackoverflow 问题都是关于做相反的事情。
我还看到有些人只是将 Avro 中的反序列化器与他们自己的类包装在一起,该类会吃掉异常并返回 null。这是一个更好的计划吗?
【问题讨论】:
标签: java spring apache-kafka avro spring-kafka