【问题标题】:How to move past undeserializable messages in Spring Kafka 1.3 with Avro如何使用 Avro 在 Spring Kafka 1.3 中移过不可反序列化的消息
【发布时间】:2019-07-14 15:11:07
【问题描述】:

我无法升级到 Spring 5,所以我只能使用 spring-kafka 1.3 及其有限的错误处理。所以我无法访问 spring-kafka 2 中的 ConsumerAwareErrorHandler 或 SeekToCurrent 错误处理程序。

我正在使用@KafkaListener-annotated 方法来收听一个主题,我已将io.confluent.kafka.serializers.KafkaAvroDeserializer 配置为我的值反序列化器。

问题是,如果我在主题中得到一条不是 Avro 格式的消息,那么 KafkaMessageListenerContainer 轮询循环就会卡住。反序列化程序会在消息上引发异常,并且轮询循环永远不会越过它,因此下次通过循环时,它会尝试反序列化相同的消息并继续循环,每秒将相同的错误转储到我的日志中数千次。

似乎没有办法获得 NeverRetryPolicy 或边缘方面的任何东西,但我可以factory.getContainerProperties().setErrorHandler()。不幸的是,我不确定我能从那里做什么。

有什么东西可以自动装配到我的错误处理程序中,我可以用它来寻找错误时的前向 1 个偏移量吗?不确定那是什么,文档并没有过多地谈论您可以使用 ErrorHandler 实际做什么,我能找到的大多数示例都是针对 spring-kafka 2.X。就像,它不会反序列化,我对消息无能为力,它永远不会起作用,我想避免再次重试它,似乎大多数 Stackoverflow 问题都是关于做相反的事情。

我还看到有些人只是将 Avro 中的反序列化器与他们自己的类包装在一起,该类会吃掉异常并返回 null。这是一个更好的计划吗?

【问题讨论】:

    标签: java spring apache-kafka avro spring-kafka


    【解决方案1】:

    问题是反序列化在 Spring 获取数据之前就失败了 - 问题出在 Kafka 本身。

    在 2.2 中,我们添加了 ErrorHandlingDeserializer2,它封装了真正的反序列化器并向侦听器容器发送信号,以便可以将错误发送到错误处理程序。

    在旧版本中,您需要编写自己的反序列化器包装器 - 但是,容器中没有处理这种情况的代码,因此您的 catch 块需要返回一个真实的对象,该对象是向您的侦听器发出的信号反序列化失败。

    假设您的听众收到Invoice。您的 catch 块可以创建一个子类,例如 BadInvoice,然后您可以在侦听器中检测并丢弃它。

    我还看到有些人只是将 Avro 的反序列化器与他们自己的类包装在一起,该类吃掉异常并返回 null。这是一个更好的计划吗?

    如果您从未获得真正的空记录,您可以返回null,但您必须将@Payload(required = false) 添加到方法参数中。

    【讨论】:

    • 问题:是的,kafka 内部的反序列化失败,并且确实弄乱了轮询循环,因此它永远不会到达侦听器,但我确实在 ErrorHandler 方法中得到了 SerializationException,是否有可能访问 SOMETHING 到在那里寻找?我已经尝试实现 ConsumerSeekAware,但是当 ErrorHandler 尝试访问它时,我的 ThreadLocal.get() 为空...... IIRC 即使在错误之前注册了一个,但我会在 AM 中再次检查。跨度>
    • 是的,我们确实调用了一般异常的错误处理程序(除了那些由监听器抛出的异常)。但是,Kafka 没有为我们提供有关哪个主题/分区/偏移量未能反序列化的信息。即使做到了,也很难看出我们能做什么。 poll() 将返回多条记录(默认最多 500 条)。即使只有一个主题/分区,错误处理程序也无能为力,因为我们不能跳过坏记录而不丢失早期的好记录。不幸的是,唯一的解决方案是“智能”反序列化器(或使用命令行工具重置偏移量)。
    • 啊,this.containerProperties.getGenericErrorHandler().handle(var11, (Object)null); 嗯...当。
    • 实际上...很抱歉一直在强调这一点,但 Avro 序列化程序实际上在错误消息中包含主题、分区和失败消息的偏移量...我需要正则表达式来解析它们但假设我可以,我是否能够寻求 1 过去,还是我仍然遇到批处理问题?
    • 您仍然会遇到批处理问题,除非您将max.poll.records 设置为 1(这会影响性能)。
    猜你喜欢
    • 2019-07-12
    • 2019-11-25
    • 2019-08-03
    • 2020-07-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-11-18
    • 2020-07-06
    相关资源
    最近更新 更多