【问题标题】:Spring Kafka and auto ack timeoutSpring Kafka和自动确认超时
【发布时间】:2017-12-10 19:24:06
【问题描述】:

我使用 Kafka 消息代理和 Spring Kafka。

我使用默认的自动确认模型。有时我的@KafkaListener 需要大约 5 分钟才能完成工作。

我注意到当它发生时,Kafka 会产生重复的消息。 是否可以配置任何超时属性以防止 Kafka 重复相同的消息并让 Producer 等待(至少例如 10 分钟),而 Consumer 将完成工作?

【问题讨论】:

  • 生产者和消费者是分离的——没有内置的方法可以阻止生产者发送更多消息,因为消费者很慢。您需要显示更多信息 - spring-kafka 的哪个版本,kafka 的哪个版本等;日志也会有帮助。我一般避免auto.commit;通常最好让弹簧容器管理偏移量。
  • 谢谢。我使用 Kafka 1.0.0 和 Spring Kafka 2.0.0.RELEASE。我可能错了,但是当消费者长时间工作时,生产者(或 Kafka 本身)似乎重复了相同的消息。这种情况下是否有任何超时属性,以便让 Consumer 工作至少 10 分钟,并且 Kafka 在此之前不会重复相同的消息?
  • 另外,我没有明确使用 auto.commit。我使用默认的 Spring 配置

标签: apache-kafka spring-kafka


【解决方案1】:

生产者和消费者是分离的——因为消费者很慢,所以无法阻止生产者发送更多消息。

我没有明确使用 auto.commit。我使用默认的 Spring 配置

auto.commit 默认为true(在 Kafka 客户端中),因此 spring 下的默认行为是客户端执行自己的提交而不是容器。将其设置为false 并将容器属性AckMode 设置为RECORD,以便在侦听器退出后执行提交。

有一个 kafka consumer property max.poll.interval.ms 默认为 5 分钟 (300000)。

使用消费者组管理时调用 poll() 之间的最大延迟。这为消费者在获取更多记录之前可以空闲的时间量设置了上限。如果在此超时到期之前未调用 poll(),则认为消费者失败,组将重新平衡,以便将分区重新分配给另一个成员。

重新平衡将导致重新交付。您可以增加该属性以避免重新平衡。

正如我所说,日志通常会有所帮助;您应该会在日志中看到重新平衡。

【讨论】:

  • 非常感谢您的详细解答!我需要一些时间来测试它。在那之后,我会接受答案。
  • >I use Kafka 1.0.0 and Spring Kafka 2.0.0.RELEASE. - 顺便说一句,2.1.0.RELEASE 本机使用 1.0.0 并明确发布这样做;它将传递地引入 1.0.0。我们的规则不允许我们更改客户端版本,并且 2.0.0.RELEASE(现在为 2.0.2)在 1.0.0 可用之前已经发布以使用 0.11.0.x 客户端。
猜你喜欢
  • 2020-06-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-08-15
  • 2017-04-29
  • 2017-07-03
  • 1970-01-01
相关资源
最近更新 更多