【发布时间】:2019-12-15 05:20:21
【问题描述】:
我正在使用 Spring Kafka 消费者和 Avro 模式来构建我的应用程序。
但是,如果无法将消息反序列化为我构建的指定 Avro 特定记录,则消费者将不断重试消费者相同的消息(无限重试)。
对于这种情况,如果我的消费者发生反序列化程序异常,我如何配置消费者应用程序以跳过当前消息并移动到下一个偏移量。
我查看了 Spring Kafka 错误句柄,它只能在侦听器中处理异常,而不是在反序列化阶段。
我的消费者应用程序非常简单:
@KafkaListener(id = "demo-consumer-stream-group", topics = "customer-output-")
public void process(ConsumerRecord<String, Customer> record) {
LOGGER.info("Customer key: {} and value: {}", record.key(), record.value());
LOGGER.info("topic: {}, partition: {}, offset: {}", record.topic(), record.partition(), record.offset());
}
基于此代码,有时接收到的消息可能不会反序列化为正确的Customer 对象。
另外,我看到最近的一个解决方案是使用 Spring Kafka 的 ErrorHandlingDeserializer2 来处理这个问题,但是由于我使用的是 KafkaAvroDeserializer,我该如何解决这些配置?我目前的配置是:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
【问题讨论】:
标签: java spring apache-kafka spring-kafka