【问题标题】:How Spring Kafka Consumer skips from Avro Deserializer exceptionSpring Kafka Consumer 如何从 Avro Deserializer 异常中跳过
【发布时间】:2019-12-15 05:20:21
【问题描述】:

我正在使用 Spring Kafka 消费者和 Avro 模式来构建我的应用程序。

但是,如果无法将消息反序列化为我构建的指定 Avro 特定记录,则消费者将不断重试消费者相同的消息(无限重试)。

对于这种情况,如果我的消费者发生反序列化程序异常,我如何配置消费者应用程序以跳过当前消息并移动到下一个偏移量。

我查看了 Spring Kafka 错误句柄,它只能在侦听器中处理异常,而不是在反序列化阶段。

我的消费者应用程序非常简单:

@KafkaListener(id = "demo-consumer-stream-group", topics = "customer-output-")
  public void process(ConsumerRecord<String, Customer> record) {
    LOGGER.info("Customer key: {} and value: {}", record.key(), record.value());
    LOGGER.info("topic: {}, partition: {}, offset: {}", record.topic(), record.partition(), record.offset());
  }

基于此代码,有时接收到的消息可能不会反序列化为正确的Customer 对象。

另外,我看到最近的一个解决方案是使用 Spring Kafka 的 ErrorHandlingDeserializer2 来处理这个问题,但是由于我使用的是 KafkaAvroDeserializer,我该如何解决这些配置?我目前的配置是:

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);

【问题讨论】:

    标签: java spring apache-kafka spring-kafka


    【解决方案1】:

    您需要将当前值 + 键解串器都设置为 ErrorHandlingDeserializer.class 并将当前值 + 键解串器设置为 ErrorHandlingDeserializer 键/值解串器属性

    这看起来类似于:

    ... // other props
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
    props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);
    props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
    

    【讨论】:

      【解决方案2】:

      我是explained in the documentation

      您通过自定义 Spring 属性将反序列化器设置为错误处理反序列化器及其委托。

      您可以使用 DefaultKafkaConsumerFactory 构造函数,该构造函数采用键和值 Deserializer 对象,并连接到您已使用适当的委托配置的适当 ErrorHandlingDeserializer2 实例。或者,您可以使用使用者配置属性(由 ErrorHandlingDeserializer 使用)来实例化委托。属性名称为 ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS 和 ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS。属性值可以是类或类名。以下示例显示了如何设置这些属性:

      ... // other props
      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
      props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
      props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
      props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
      props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
      props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
      return new DefaultKafkaConsumerFactory<>(props);
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2023-03-09
        • 2019-04-19
        • 1970-01-01
        • 1970-01-01
        • 2021-12-28
        • 2020-12-13
        • 2019-10-11
        相关资源
        最近更新 更多