【问题标题】:Kafka continuous rebalancing when slow consumer当消费者缓慢时,Kafka 持续重新平衡
【发布时间】:2019-06-12 11:56:18
【问题描述】:

我一直在尝试测试我们的 Kafka 是否有一些负面情况,其中一个是非常慢的消费者。我在我的@KafkaListener 方法中设置了Thread.sleep(15000)(它是spring-kafka)并将并发设置为3。我有1 个主题和1 个分区。 我将 10 条消息放入主题并启动服务。 当3个消费者开始时,他们都到达(Re-)joining group点, 但是只有其中一个(假设它是consumer-2)会到达:

Successfully joined group with generation X

然后开始慢慢消费消息。

(顺便说一句,我使用 MANUAL_IMMEDIATE Ack 模式,但即使我不向侦听器添加 Acknowledgement 参数并且不确认消息,它也是可重现的)。 我接下来看到的内容如下: 直到消费者 2 处理所有消息,每 3 秒(默认心跳间隔)我在控制台中收到一条消息:

AbstractCoordinator$HeartbeatResponseHandler: [Consumer clientId=consumer-2, groupId=pixel-group] Attempt to heartbeat failed since group is rebalancing

我想知道为什么会这样。只有在所有 10 条消息都是进程之后,才会有另一个重新平衡,之后所有 3 个消费者都将打印:

Successfully joined group with generation X

其中一个将被分配一个分区,不再有心跳问题。 这只发生在我将睡眠间隔设置为高于心跳间隔的值时。它通常在所有消费者都启动时发生一次,但很快就会成功设置。

所以,总结起来似乎是:

如果消费者处理时间> 心跳间隔时间 - 除了第一个消费者之外的所有消费者都无法完成重新平衡(他们可能无法与他们缓慢的领导者交谈)。 我不明白为什么这个心跳错误如此持久? 如果睡眠时间比心跳时间长,为什么其他消费者不能在领导者的消息消费之间完成重新平衡?

更新 卡夫卡版本 2.12-2.2.0 Spring-Kafka 2.2.3.RELEASE

【问题讨论】:

  • 嗨,这里有关于再平衡协议的很好的讨论。正如 Gary 下面提到的,如果您想正确扩展,您需要将主题的分区与您的实例对齐。会谈:youtube.com/watch?v=MmLezWRI3Ysconfluent.io/kafka-summit-lon19/… 一些文档:改进的 kip:cwiki.apache.org/confluence/display/KAFKA/…
  • 感谢您的链接。我明白我需要使两者保持一致。但我也想了解消费者如何在重新平衡和不同的超时选项方面工作。正如我在实验中解释的那样,它提出了一些挑战,但没有明显的答案。
  • 是的,我明白了。在那些谈话中,他们详细介绍了消费者在重新平衡和不同配置选项方面的工作方式,所以如果你有时间,我强烈建议你观看它们。也许高速到你觉得有趣的地步你的情况..但那里有关于你的问题的非常有价值的信息。为了在这里获得更好的答案,我建议您发布代码和配置。

标签: apache-kafka kafka-consumer-api spring-kafka


【解决方案1】:

...并发到 3。我有 1 个主题和 1 个分区...

您至少需要与消费者一样多的分区 - 一个分区只能由一个消费者使用。

你用的是什么版本?从 KIP-62 (Kafka 0.10.1.0) 开始,心跳由 kafka-clients 在后台发送。因此,仅当侦听器花费的时间超过max.poll.interval.ms 时,才应进行重新平衡。谷歌 KIP-62 了解更多信息。

编辑

当你的听众正在睡觉时,你应该会看到这样的日志......

2019-06-13 09:47:52.008 DEBUG 61914 --- [hread | rbgh664] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=source, groupId=rbgh664] Heartbeat thread started
...
2019-06-13 09:47:52.072  INFO 61914 --- [           main] com.example.Rbgh664Application           : Sleeping for 15
2019-06-13 09:47:55.120 DEBUG 61914 --- [hread | rbgh664] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=source, groupId=rbgh664] Sending Heartbeat request to coordinator localhost:9092 (id: 2147483647 rack: null)
2019-06-13 09:47:55.121 TRACE 61914 --- [hread | rbgh664] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=source, groupId=rbgh664] Sending HEARTBEAT {group_id=rbgh664,generation_id=45,member_id=source-82297cee-063a-4e0d-89d0-15cfbd6ef680} with correlation id 10 to node 2147483647
2019-06-13 09:47:55.226 TRACE 61914 --- [hread | rbgh664] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=source, groupId=rbgh664] Completed receive from node 2147483647 for HEARTBEAT with correlation id 10, received {throttle_time_ms=0,error_code=0}
2019-06-13 09:47:55.227 DEBUG 61914 --- [hread | rbgh664] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=source, groupId=rbgh664] Received successful Heartbeat response
2019-06-13 09:47:58.120 DEBUG 61914 --- [hread | rbgh664] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=source, groupId=rbgh664] Sending Heartbeat request to coordinator localhost:9092 (id: 2147483647 rack: null)
2019-06-13 09:47:58.120 TRACE 61914 --- [hread | rbgh664] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=source, groupId=rbgh664] Sending HEARTBEAT {group_id=rbgh664,generation_id=45,member_id=source-82297cee-063a-4e0d-89d0-15cfbd6ef680} with correlation id 11 to node 2147483647
2019-06-13 09:47:58.225 TRACE 61914 --- [hread | rbgh664] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=source, groupId=rbgh664] Completed receive from node 2147483647 for HEARTBEAT with correlation id 11, received {throttle_time_ms=0,error_code=0}
2019-06-13 09:47:58.226 DEBUG 61914 --- [hread | rbgh664] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=source, groupId=rbgh664] Received successful Heartbeat response
2019-06-13 09:48:01.203 DEBUG 61914 --- [hread | rbgh664] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=source, groupId=rbgh664] Sending Heartbeat request to coordinator localhost:9092 (id: 2147483647 rack: null)
2019-06-13 09:48:01.204 TRACE 61914 --- [hread | rbgh664] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=source, groupId=rbgh664] Sending HEARTBEAT {group_id=rbgh664,generation_id=45,member_id=source-82297cee-063a-4e0d-89d0-15cfbd6ef680} with correlation id 12 to node 2147483647
2019-06-13 09:48:01.310 TRACE 61914 --- [hread | rbgh664] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=source, groupId=rbgh664] Completed receive from node 2147483647 for HEARTBEAT with correlation id 12, received {throttle_time_ms=0,error_code=0}
2019-06-13 09:48:01.310 DEBUG 61914 --- [hread | rbgh664] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=source, groupId=rbgh664] Received successful Heartbeat response
2019-06-13 09:48:04.285 DEBUG 61914 --- [hread | rbgh664] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=source, groupId=rbgh664] Sending Heartbeat request to coordinator localhost:9092 (id: 2147483647 rack: null)
2019-06-13 09:48:04.286 TRACE 61914 --- [hread | rbgh664] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=source, groupId=rbgh664] Sending HEARTBEAT {group_id=rbgh664,generation_id=45,member_id=source-82297cee-063a-4e0d-89d0-15cfbd6ef680} with correlation id 13 to node 2147483647
2019-06-13 09:48:04.390 TRACE 61914 --- [hread | rbgh664] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=source, groupId=rbgh664] Completed receive from node 2147483647 for HEARTBEAT with correlation id 13, received {throttle_time_ms=0,error_code=0}
2019-06-13 09:48:04.390 DEBUG 61914 --- [hread | rbgh664] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=source, groupId=rbgh664] Received successful Heartbeat response
20
...

【讨论】:

  • 对不起,将添加版本。确切地说,心跳是在一个单独的线程中,这使得它更加奇怪。我使用最新的 Kafka。
  • 如果侦听器等待 15 秒,则不应发生重新平衡; max.poll.interval.ms 默认为 300000。打开 TRACE 日志,看看它是否能给你更多的线索。
  • 加里,你能详细说明一下吗?我确实从一开始就打开了 TRACE 日志记录,现在没有任何线索。 “不应该发生再平衡”是什么意思?当我启动服务并且除了第一个消费者之外的所有人都无法完成时,就会发生这种情况。
  • 查看我的答案的编辑;您应该会看到当您的消费者没有响应时让消费者保持活动状态的那些心跳消息。
猜你喜欢
  • 1970-01-01
  • 2018-05-23
  • 2017-06-19
  • 1970-01-01
  • 1970-01-01
  • 2020-08-04
  • 2021-01-26
  • 2022-10-05
  • 2020-06-30
相关资源
最近更新 更多