【问题标题】:Updating Kafka Offset Immediately立即更新 Kafka 偏移量
【发布时间】:2018-10-17 07:59:17
【问题描述】:

我正在开发一个 Spring Boot 应用程序,该应用程序对推送到 Kafka 队列的消息做出反应。

版本为 Spring Boot 2.0.5,Finchley.SR1。

Kafka版本是kafka_2.12-1.1.0

我面临的问题是 有时 当我重新启动应用程序时,它会重播旧消息。这并不总是发生 - 我发现的唯一模式是它似乎是在几天不活动之后(比如周一早上,周末之后)。

作为开发的一部分,我在一天中多次停止和启动应用程序,但没有看到相同的问题,只是偶尔出现。它也没有与应用程序中的错误相关联,因为所有处理都是干净的。

我已将我的 Kafka 侦听器配置为使用 MANUAL_IMMEDIATE 确认,并在侦听器方法结束时调用 ack.acknowledge()。

我的 Spring 属性文件如下所示:

spring:
  kafka:
    bootstrap-servers: kafka:9092
    listener:
      ack-mode: MANUAL_IMMEDIATE
    consumer:
      enable-auto-commit: false
      auto-offset-reset: earliest
      group-id: user-mgmt-app

我的 Listener 类定义如下:

@org.springframework.kafka.annotation.KafkaListener(topics = "aggregate-event-topic")
public void receive(ConsumerRecord<?, ?> cr, Acknowledgment ack) {

   ...
   ack.acknowledge();

}

我有一个应用实例正在运行,所以它每次都是消费者组中的领导者。

我使用 Kafka 工具查看了消费者组的偏移量,我注意到的一件事是,当我在确认步骤断点应用程序时,它并没有更新 CURRENT-OFFSET,它似乎只是更新了处理完所有消息后。

./kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group user-mgmt-app --describe

我从其他帖子的理解是,MANUAL_IMMEDIATE 会在调用 acknowedge() 后立即更新 Kafka 服务器,而不是在批处理结束时。

我的理解有误吗?如果是这样的话,无论如何都可以获得我想要的功能(例如在每次从分区读取时将批处理大小设置为 1,我猜这可能会影响性能)。如果是这样,我该怎么做(感激地接受任何帮助!)

TIA

【问题讨论】:

  • enable-auto-commit 这样做,手动提交将在所有处理之后进行(例如,您需要处理超时和重新平衡)。
  • 如果我将“enable_auto_commit=true”设置为应用程序无法启动,并且据我所知“enable_auto_commit”仅在批处理结束时提交,则我无法使用 MANUAL_IMMEDIATE?所以我回到设置“max-poll-records=1”?
  • kafka 中的自动提交是在处理之前完成的,但可能会根据其配置而有所不同。由于您每次启动应用程序时都有auto-offset-reset: earliest,它将从头开始读取(根据您的 kafka 服务器的保留大小/时间)。无论如何,MANUAL_IMMEDIATE 应该立即提交,记住每个记录都会调用侦听器,要访问您需要将工厂定义为批处理的完整批处理,请参阅docs.spring.io/spring-kafka/reference/html/…(向下滚动)
  • 我发现 MANUAL_IMMEDIATE 不会立即更新,因此是我的问题。如果我将 poll-size 设置为 1,那么它是一致的。

标签: java spring spring-boot apache-kafka spring-kafka


【解决方案1】:

我面临的问题是,有时当我重新启动应用程序时,它会重播旧消息。这并不总是发生 - 我发现的唯一模式是它似乎是在几天不活动之后(比如周一早上,周末之后)。

我猜您没有使用 2.0.0 代理,其中消费者偏移量的默认保留期从 24 小时增加到 7 天。较旧的代理仅在一天后使偏移量到期 - 如果您在周末没有消息,这是典型的问题。

Notable Changes in 2.0.0

KIP-186 将默认偏移保留时间从 1 天增加到 7 天。这使得它不太可能在不经常提交的应用程序中“丢失”偏移量。它还增加了活动的偏移量集,因此可以增加代理的内存使用量。请注意,控制台使用者当前默认启用偏移提交,并且可能是大量偏移的来源,此更改现在将保留 7 天而不是 1 天。您可以通过设置代理配置 offsets.retention 来保留现有行为。分钟到 1440。

我不确定您为什么没有通过命令行工具看到偏移量更新。 AckMode.RECORD 将在每条记录后更新偏移量。只要 Spring Kafka 版本 >= 1.3,MANUAL_IMMEDIATE 就会在您调用 acknowledge() 时更新(Boot 2.0.x 将引入 Spring Kafka 2.0.x)。

【讨论】:

  • 我刚刚用ack-mode=record 进行了测试,我发现kafka-consumer-groups ... --describe 中的偏移量保持得很好。
  • 谢谢加里,根据我所看到的,这是有道理的。您提到不使用 2.0.0 代理 - /kafka/libs 下的 jar 被命名为 kafka_2.12-1.1.0 所以这不意味着我使用的是足够晚的版本吗?我已经检查了 server.properties 并且它没有“offsets.retention.minutes”的属性,所以我会尝试添加它并看看会发生什么。我还将使用命令行工具运行一些测试,以确保它们没有像我提到的那样更新,并会按照您的建议检查 Spring Boot 版本,
  • 2.12是Scala版本; 1.1.0 是代理版本。最新的代理版本是 2.0.0(增加了默认保留)。如果该属性不存在,则使用默认值;请参阅the 1.1.0 docs,默认为 1440 分钟。
  • 好的,谢谢您的信息,我会尝试更高版本的。
  • 刚刚更新到 Kafka 2.0.0 并明确将“offsets.retention.minutes”属性设置为 2 周(只是为了确定!),并将在周末后的星期一查看它的表现。我使用 MANUAL_IMMEDIATE 的命令行再次检查了偏移量,它似乎仅在收到第一条消息后才更新偏移量(后续消息与列出的偏移量保持同步)。当我将其更改为 RECORD(当然,在我的侦听器代码中删除了 Acknowledge 参数)时,它似乎 100% 一致,因此将切换到该方法,看看它是如何进行的。再次感谢您的帮助加里!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-12-25
  • 1970-01-01
  • 1970-01-01
  • 2020-06-04
  • 2017-11-25
相关资源
最近更新 更多