session.timeout.ms 用于通过心跳机制检测消费者故障。消费者心跳线程必须在session.timeout.ms时间到期之前向代理发送心跳。否则,Kafka 认为消费者已经死亡并触发重新平衡。
heartbeat.interval.ms: 心跳到
使用 Kafka 的组管理设施时的消费者协调员。
心跳用于确保消费者的会话保持活动状态
并在新消费者加入或离开时促进再平衡
组。
session.timeout.ms: 用于检测客户端故障的超时时间
使用 Kafka 的组管理工具。客户端定期发送
心跳以向代理指示其活跃性。如果没有心跳
在此会话到期之前由经纪人收到
超时,则代理将从组中删除该客户端,并
启动重新平衡。
轮询是检查消费者健康状况的另一种机制。消费者应该调用 poll() 方法而不使 max.poll.interval.ms 过期。如果此时间到期(通常长时间运行的进程会导致此问题),消费者再次被视为死亡并触发重新平衡。
max.poll.interval.ms: poll() 调用之间的最大延迟
使用消费者组管理时。这设置了一个上限
消费者在获取更多信息之前可以空闲的时间量
记录。如果在此超时到期之前未调用 poll(),
然后消费者被认为是失败的,该组将重新平衡
为了将分区重新分配给另一个成员。
其他重要的一点是(从版本 0.10.1.0 开始):
rebalance.timeout = max.poll.interval.ms
由于我们给客户端最多 max.poll.interval.ms 来处理
批量记录,这也是消费者可以使用的最长时间
预计在最坏的情况下会重新加入该小组。因此我们
建议将Java客户端中的rebalance timeout设置为相同
使用 max.poll.interval.ms 配置的值。当重新平衡开始时,
后台线程将继续发送心跳。消费者
在处理完成并且用户之前不会重新加入组
调用 poll()。从协调者的角度来看,消费者将
在 1) 他们的会话超时之前不会从组中删除
没有收到心跳就过期了,或者 2) 重新平衡超时
过期了。
因此,在您的情况下,如果 session.timeout.ms 在没有消费者心跳的情况下过期,则在此消费者组中启动重新平衡。重新平衡开始后,消费者组中的所有消费者都被撤销,Kafka 等待所有仍在向 poll() 发送心跳的消费者(通过轮询消费者在此时发送 joinGroupRequest),直到重新平衡超时到期,等于max.poll.interval.ms。
在重新平衡期间,您仍然可以处理您已经拥有但无法提交的消息并通过此消息获得 CommitFailedException:
提交无法完成,因为该组已经重新平衡并且
将分区分配给另一个成员。这意味着时间
对 poll() 的后续调用之间的时间比配置的长
max.poll.interval.ms,这通常意味着轮询循环是
花费太多时间处理消息。您可以解决这个问题
通过增加会话超时或减少最大大小
使用 max.poll.records 在 poll() 中返回的批次。
更多信息您可以查看this。