【发布时间】:2019-02-01 12:02:59
【问题描述】:
我已经使用KafkaHandler 实现了一个 Kafka 消费者。我的消费者应该消费事件,然后为每个事件向其他服务发送一个 REST 请求。我只想在该 REST 服务关闭时重试。否则,我可以忽略失败的事件。
我的容器工厂配置如下:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setStatefulRetry(true);
factory.setRetryTemplate(retryTemplate());
factory.setConcurrency(3);
ContainerProperties containerProperties = factory.getContainerProperties();
containerProperties.setAckOnError(false);
containerProperties.setAckMode(AckMode.RECORD);
containerProperties.setErrorHandler(new SeekToCurrentErrorHandler());
return factory;
}
我使用ExceptionClassifierRetryPolicy 来设置异常和相应的重试策略。
重试后一切正常。当我收到ConnectException 时它会重试,当我收到IllegalArgumentException 时它会忽略。
然而,在IllegalArgumentException 场景中,SeekToCurrentErrorHandler 会回溯到未处理的偏移量(因为它会回溯包括失败消息在内的未处理消息),最终会立即重试失败的消息。消费者不断来回重试百万次。
如果我有机会了解SeekToCurrentErrorHandler 中的哪条记录失败,那么我将创建SeekToCurrentErrorHandler 的自定义实现来检查失败的消息是否可重试(通过使用thrownException 字段)。如果它不可重试,那么我会将其从records 列表中删除以进行查找。
关于如何实现此功能的任何想法?
注意:enable.auto.commit 设置为 false,auto.offset.reset 设置为 earliest。
谢谢!
【问题讨论】:
-
为什么将 auto.offset.reset 设置为最早?我认为它应该在 PROD env 上设置为最新。
-
我只有 1 个消费组,所以没关系。您可以查看this 以获得更好的解释。
标签: apache-kafka spring-kafka spring-retry