在之前的线程Here 中还解释了会话超时和最大轮询超时。我也解释一下我对此的理解。
ConsumerRecords 轮询(最终长时间超时):
用于从主题的分区中顺序获取数据,从上次使用的偏移量或手动设置的偏移量开始。如果有可用的记录,这将立即返回,否则它将等待通过的超时。如果超时通过将返回空记录。
轮询 API 不断调用以获取任何到达的新消息,并确保消费者的活跃度。在封面之下
session.timeout.ms 在每次轮询期间,消费者协调器向代理发送心跳,以确保消费者的会话处于活动状态。如果经纪人在 session.timeout.ms 经纪人之前没有收到任何心跳,那么经纪人离开该消费者并重新平衡
您可以假设 session.timeout.ms 是代理等待从消费者获取心跳的最长时间,而 heartbeat.interval.ms 是消费者假设向代理发送心跳的预期时间。
这就是为什么 heartbeat.interval.ms 总是小于 session.timeout.ms 因为理想情况下会话超时的 1/3。
max.poll.interval.ms :使用消费者组管理时调用 poll() 之间的最大延迟。这意味着在获取更多记录之前,消费者最大时间将处于空闲状态。如果在此超时到期之前未调用 poll(),则认为消费者失败,组将通过调用 poll 重新平衡,以便将分区重新分配给另一个消费者实例.
如果我们正在进行长时间的批处理,最好增加 max.poll.interval.ms 但请注意增加这个值可能会延迟组重新平衡,因为消费者只会在轮询调用中加入重新平衡。我们可以通过调整 max.poll.records 来保持最大轮询间隔较低。
现在让我们讨论它们之间的关系。
消费者在调用 poll 时检查心跳,会话超时 poll 在后台超时,方式如下:
- 消费者协调器检查消费者是否处于重新平衡状态,如果仍在重新平衡,则等待协调器加入消费者。等待并调用 poll 。请注意,如果 max.poll.interval.ms 很大,重新平衡将需要更多时间。
在轮询和重新平衡完成后,协调器检查会话超时
如果会话超时过期而没有看到成功的心跳,旧的协调器将断开连接,因此下一次轮询将尝试重新平衡。
因此,如果会话超时消费者协调器本身死亡并且调用轮询必须在重新平衡之前分配新的协调器,则会话超时直接依赖于时间协调器的活跃度。
在会话超时后检查协调器验证 heartbeat.pollTimeoutExpired 是否轮询超时已过期,这意味着前台线程已在两次调用 poll() 之间停止,因此成员明确离开组并调用 poll 以加入新的消费者而不是整个消费者组协调员。
- 在会话超时和轮询超时验证之后以及发送心跳状态之前,消费者协调器会检查心跳超时,如果心跳超过最大延迟心跳时间,则暂停/等待以重试回退并再次轮询。
- 如果心跳时间也在限制内,则消费者协调器发送 sendHeartbeatRequest
- 如果 sendHeartbeatRequest 成功,线程将重置心跳时间并调用 poll,但如果失败且消费者组未处于重新平衡状态,它将唤醒消费者组协调器再次调用 poll。
正如在共享链接轮询中提到的那样,轮询与心跳无关,因此在轮询期间,如果轮询相当大的心跳仍然允许发送心跳,以确保您的线程处于活动状态,这意味着会话超时不会直接链接到轮询。
session.timeout.ms:接收心跳的最长时间
max.poll.interval.ms:独立处理线程的最大时间
因此,如果您将 max.poll.interval.ms 设置为 300,000,那么下一次轮询将有 300,000 毫秒,这意味着消费者线程最多有 300,000 毫秒来完成处理。在心跳之间将继续在 heartbeat.interval.ms 即 3,000 处发送心跳请求以指示线程仍然处于活动状态,以防在 session.timeout.ms 之前没有心跳,即 10,000 协调器将死亡并调用 poll 以重新分配新的协调器并重新平衡