【问题标题】:How to skip corrupt (non-serializable) messages in Spring Kafka Consumer?如何跳过 Spring Kafka Consumer 中的损坏(不可序列化)消息?
【发布时间】:2019-04-19 01:26:38
【问题描述】:

这个问题是针对Spring Kafka的,与Apache Kafka with High Level Consumer: Skip corrupted messages相关

有没有办法配置 Spring Kafka 消费者跳过无法读取/处理(损坏)的记录?

我看到如果无法反序列化,消费者会卡在同一记录上。这是消费者抛出的错误。

Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of java.time.LocalDate: no long/Long-argument constructor/factory method to deserialize from Number value 

消费者轮询主题并在循环中不断打印相同的错误,直到程序被杀死。

在具有以下 Consumer 工厂配置的 @KafkaListener 中,

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

【问题讨论】:

  • 我相信消费者从未敲过一条记录,如果您多次看到该错误消息,则意味着它发生在多条记录中
  • 你能更新 JSON 负载和模型映射器类吗?
  • @Deadpool 消费者卡在同一条消息上。我通过日志和消费者组的偏移量验证了它。我解决了 JSON 序列化错误,但我的问题的目的是找到一种完全跳过此消息的方法。

标签: java apache-kafka spring-kafka


【解决方案1】:

你需要ErrorHandlingDeserializerhttps://docs.spring.io/spring-kafka/docs/2.2.0.RELEASE/reference/html/_reference.html#error-handling-deserializer

如果您无法迁移到该 2.2 版本,请考虑实现您自己的版本并为无法正确反序列化的记录返回 null

源码在这里:https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/ErrorHandlingDeserializer2.java

【讨论】:

  • 您也可以使用kafka-consumer-groups.sh 来推进过去的不良记录。请参阅--shift-by 选项。但ErrorHandlingDeserializer 旨在在生产应用程序中处理此问题。
  • ErrorHandlingDeserializer 已被弃用,您现在可以以同样的方式使用ErrorHandlingDeserializer2
  • 这怎么能与KafkaAvroDeserializer的Avro一起使用?谢谢。
  • 请提出一个新的 SO 问题
【解决方案2】:

如果您使用的是旧版本的 kafka,请在 @KafkaListener 中设置以下消费者工厂配置。

 Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomDeserializer.class);

这是 CustomDeserializer 的代码:

 import java.util.Map;
    import org.apache.kafka.common.serialization.Deserializer;
    import com.fasterxml.jackson.databind.ObjectMapper;
    public class CustomDeserializer implements Deserializer<Object>
    {
        @Override
        public void configure( Map<String, ?> configs, boolean isKey )
        {
        }

        @Override
        public Object deserialize( String topic, byte[] data )
        {
            ObjectMapper mapper = new ObjectMapper();
            Object object = null;
            try
            {
                object = mapper.readValue(data, Object.class);
            }
            catch ( Exception exception )
            {
                System.out.println("Error in deserializing bytes " + exception);
            }
            return object;
        }

        @Override
        public void close()
        {
        }
    }

因为我希望我的代码足够通用以读取任何类型的 json, object = mapper.readValue(data, Object.class);我正在将其转换为 Object.class。由于我们在这里捕获异常,因此一旦读取就不会重试。

【讨论】:

    猜你喜欢
    • 2020-07-10
    • 2015-12-30
    • 2017-02-15
    • 2019-07-14
    • 2019-12-15
    • 2018-10-21
    • 1970-01-01
    • 2019-09-01
    相关资源
    最近更新 更多