【发布时间】:2020-04-29 23:07:33
【问题描述】:
背景:我有一个在 Kubernetes 中运行的应用程序,它使用 Kafka 作为集中式消息总线。我的应用程序中的 Kafka 客户端可能非常慢。在将成员踢出组并重新平衡之前,Kafka 代理在后续 poll() 调用之间等待的最长时间由 max.poll.interval.ms 控制。
对于此应用程序中的大多数工作人员,我可以将 max.poll.interval.ms 设置为大约几分钟的时间。但是,对于上课速度较慢的工人,我需要将其设置为几个小时。
当一切正常时,这不会导致问题。但是,在网络中断或间歇性崩溃的情况下,我注意到具有非常大的最大轮询间隔的工作人员可能会在重新平衡中“卡住”。如果我在发生这种情况时查看代理,并执行类似
kafka-consumer-groups --bootstrap-server localhost:9092 --group my-group --describe --members
然后我看到代理正在等待一群不再存在的工作人员(我确定是这种情况,因为我将 group.instance.id 设置为 Kubernetes pod 主机名,所以我可以验证卡住的组成员是否真的消失了)。
通过this question,我看到 KIP-266 说“JoinGroup API 将被视为特殊情况,其超时将设置为派生自 max.poll.interval.ms 的值。”然后,我猜正在发生的事情是,我的工作人员在以某种方式与代理断开连接之前正在发送一个 JoinGroup,这导致代理在将它们标记为死之前等待完整的 max.poll.interval.ms 并允许重新平衡新员工。
当这种情况发生时,似乎我必须关闭 Kafka 代理并将它们重新启动以清除死去的成员......否则所有处理都会在代理等待期间卡住几个小时完全超时。这些都是不好的解决方案,我都不满意。
我的问题是: 是否可以调整设置以鼓励 Kafka 在放弃 JoinGroup 请求之前等待的时间少于max.poll.interval.ms?如果这意味着在网络中断后重新平衡有一点混乱,因为非常慢的消费者很晚才重新加入该组,那么我可以接受。如果没有这样的机制,我应该如何重构我的系统以避免我看到的问题?
我正在运行 Confluent Kafka confluentinc/cp-kafka:5.4.1,它似乎是 Kafka 2.4.0。
【问题讨论】:
标签: apache-kafka