【问题标题】:How to skip an Avro serialization exception in KafkaStreams API?如何在 KafkaStreams API 中跳过 Avro 序列化异常?
【发布时间】:2020-01-25 08:02:22
【问题描述】:

我有一个由 KafkaStreams Java api 编写的 Kafka 应用程序。它从 Mysql binlog 读取数据并做一些与我的问题无关的事情。问题是一个特定的行在 avro 的反序列化中产生错误。我可以深入研究 Avro 模式文件并找到问题,但总的来说,我需要的是一个宽容的异常处理程序,在遇到此类错误时不会导致整个应用程序停止。 这是我的流应用程序的主要部分:

StreamsBuilder streamsBuilder = watchForCourierUpdate(builder);

        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
        kafkaStreams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
    }

    private static StreamsBuilder watchForCourierUpdate(StreamsBuilder builder){
        CourierUpdateListener courierUpdateListener = new CourierUpdateListener(builder);
        courierUpdateListener.start();
        return builder;
    }

    private static Properties configProperties(){

        Properties streamProperties = new Properties();

        streamProperties.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, Configs.getConfig("schemaRegistryUrl"));
        streamProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "courier_app");
        streamProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Configs.getConfig("bootstrapServerUrl"));
        streamProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
        streamProperties.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/state_dir");
        streamProperties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "3");
        streamProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
        streamProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
        streamProperties.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
        streamProperties.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                CourierSerializationException.class);

        return streamProperties;

    }

这是我的 CourierSerializationException 类:

public class CourierSerializationException implements ProductionExceptionHandler {
    @Override
    public ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]> producerRecord, Exception e) {
        Logger.logError("Failed to de/serialize entity from " + producerRecord.topic() + " topic.\n" + e);
        return ProductionExceptionHandlerResponse.CONTINUE;
    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

不过,无论何时发生 avro 反序列化异常,流都会关闭并且应用程序不会继续。我是不是错过了什么!

【问题讨论】:

  • 你得到的异常到底是什么?
  • @cricket_007 我记不太清了,但是像预期的那样很久了。谢谢你,碰巧我所有与kafka相关的问题都由你回答:)
  • 根据我的经验,该错误意味着您的架构错误。但是您使用的是通用 avro,所以除非有人破坏了注册表中的架构,否则应该没问题
  • @cricket_007 是的,事实上我的架构是错误的,并且与 debezium 生成的架构略有不同。我的问题是为什么我所谓的宽容异常处理程序仍然停止应用程序?

标签: java apache-kafka avro apache-kafka-streams


【解决方案1】:

您是否尝试过使用 kafka 提供的 default.deserialization.exception.handler 来做到这一点?您可以使用 LogAndContinueExceptionHandler 它将记录并继续。

我可能错了,但我认为通过实现 ProductionExceptionHandler 创建 Customexception 仅适用于 kafka 端的网络相关错误。

将此添加到属性中,看看会发生什么:

> props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);

【讨论】:

    猜你喜欢
    • 2019-12-15
    • 1970-01-01
    • 2021-03-11
    • 1970-01-01
    • 2017-11-08
    • 2016-11-07
    • 1970-01-01
    • 2020-02-11
    • 2017-03-28
    相关资源
    最近更新 更多