【发布时间】:2018-10-17 07:59:17
【问题描述】:
我正在开发一个 Spring Boot 应用程序,该应用程序对推送到 Kafka 队列的消息做出反应。
版本为 Spring Boot 2.0.5,Finchley.SR1。
Kafka版本是kafka_2.12-1.1.0
我面临的问题是 有时 当我重新启动应用程序时,它会重播旧消息。这并不总是发生 - 我发现的唯一模式是它似乎是在几天不活动之后(比如周一早上,周末之后)。
作为开发的一部分,我在一天中多次停止和启动应用程序,但没有看到相同的问题,只是偶尔出现。它也没有与应用程序中的错误相关联,因为所有处理都是干净的。
我已将我的 Kafka 侦听器配置为使用 MANUAL_IMMEDIATE 确认,并在侦听器方法结束时调用 ack.acknowledge()。
我的 Spring 属性文件如下所示:
spring:
kafka:
bootstrap-servers: kafka:9092
listener:
ack-mode: MANUAL_IMMEDIATE
consumer:
enable-auto-commit: false
auto-offset-reset: earliest
group-id: user-mgmt-app
我的 Listener 类定义如下:
@org.springframework.kafka.annotation.KafkaListener(topics = "aggregate-event-topic")
public void receive(ConsumerRecord<?, ?> cr, Acknowledgment ack) {
...
ack.acknowledge();
}
我有一个应用实例正在运行,所以它每次都是消费者组中的领导者。
我使用 Kafka 工具查看了消费者组的偏移量,我注意到的一件事是,当我在确认步骤断点应用程序时,它并没有更新 CURRENT-OFFSET,它似乎只是更新了处理完所有消息后。
./kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group user-mgmt-app --describe
我从其他帖子的理解是,MANUAL_IMMEDIATE 会在调用 acknowedge() 后立即更新 Kafka 服务器,而不是在批处理结束时。
我的理解有误吗?如果是这样的话,无论如何都可以获得我想要的功能(例如在每次从分区读取时将批处理大小设置为 1,我猜这可能会影响性能)。如果是这样,我该怎么做(感激地接受任何帮助!)
TIA
【问题讨论】:
-
enable-auto-commit这样做,手动提交将在所有处理之后进行(例如,您需要处理超时和重新平衡)。 -
如果我将“enable_auto_commit=true”设置为应用程序无法启动,并且据我所知“enable_auto_commit”仅在批处理结束时提交,则我无法使用 MANUAL_IMMEDIATE?所以我回到设置“max-poll-records=1”?
-
kafka 中的自动提交是在处理之前完成的,但可能会根据其配置而有所不同。由于您每次启动应用程序时都有
auto-offset-reset: earliest,它将从头开始读取(根据您的 kafka 服务器的保留大小/时间)。无论如何,MANUAL_IMMEDIATE 应该立即提交,记住每个记录都会调用侦听器,要访问您需要将工厂定义为批处理的完整批处理,请参阅docs.spring.io/spring-kafka/reference/html/…(向下滚动) -
我发现 MANUAL_IMMEDIATE 不会立即更新,因此是我的问题。如果我将 poll-size 设置为 1,那么它是一致的。
标签: java spring spring-boot apache-kafka spring-kafka