【问题标题】:Can Kafka be asked to wait less than max.poll.interval.ms during a JoinGroup?可以要求 Kafka 在 JoinGroup 期间等待少于 max.poll.interval.ms 吗?
【发布时间】:2020-04-29 23:07:33
【问题描述】:

背景:我有一个在 Kubernetes 中运行的应用程序,它使用 Kafka 作为集中式消息总线。我的应用程序中的 Kafka 客户端可能非常慢。在将成员踢出组并重新平衡之前,Kafka 代理在后续 poll() 调用之间等待的最长时间由 max.poll.interval.ms 控制。

对于此应用程序中的大多数工作人员,我可以将 max.poll.interval.ms 设置为大约几分钟的时间。但是,对于上课速度较慢的工人,我需要将其设置为几个小时。

当一切正常时,这不会导致问题。但是,在网络中断或间歇性崩溃的情况下,我注意到具有非常大的最大轮询间隔的工作人员可能会在重新平衡中“卡住”。如果我在发生这种情况时查看代理,并执行类似

kafka-consumer-groups --bootstrap-server localhost:9092 --group my-group --describe --members

然后我看到代理正在等待一群不再存在的工作人员(我确定是这种情况,因为我将 group.instance.id 设置为 Kubernetes pod 主机名,所以我可以验证卡住的组成员是否真的消失了)。

通过this question,我看到 KIP-266 说“JoinGroup API 将被视为特殊情况,其超时将设置为派生自 max.poll.interval.ms 的值。”然后,我猜正在发生的事情是,我的工作人员在以某种方式与代理断开连接之前正在发送一个 JoinGroup,这导致代理在将它们标记为死之前等待完整的 max.poll.interval.ms 并允许重新平衡新员工。

当这种情况发生时,似乎我必须关闭 Kafka 代理并将它们重新启动以清除死去的成员......否则所有处理都会在代理等待期间卡住几个小时完全超时。这些都是不好的解决方案,我都不满意。

我的问题是: 是否可以调整设置以鼓励 Kafka 在放弃 JoinGroup 请求之前等待的时间少于max.poll.interval.ms?如果这意味着在网络中断后重新平衡有一点混乱,因为非常慢的消费者很晚才重新加入该组,那么我可以接受。如果没有这样的机制,我应该如何重构我的系统以避免我看到的问题?

我正在运行 Confluent Kafka confluentinc/cp-kafka:5.4.1,它似乎是 Kafka 2.4.0。

【问题讨论】:

    标签: apache-kafka


    【解决方案1】:

    在 Kafka 中,当重新平衡在消费者组中开始时,该消费者组中的所有消费者都被撤销,并且 Kafka 等待所有活着的消费者(发送心跳的消费者)到 poll()(为撤销的消费者调用 poll 意味着 JoinGroupRequest)。重要的是:

    rebalance timeout = max.poll.interval.ms
    

    而且这个无法更改。实际上这是合理的,因为 Kafka 会等待活着的消费者完成其工作并重新加入群组。所以当所有活着的消费者发送 joinGroupRequests 或发生再平衡超时时,再平衡就完成了。

    在rebalance期间,因为consumer group中的所有consumer都被撤销,所以这个consumer group的消费操作停止。因此,作为一种良好做法,应避免长时间运行的进程。

    结果:

    长时间运行的进程领先 -> 较长的 max.poll.interval.ms 时间领先 -> 较长的重新平衡时间

    【讨论】:

    • 感谢您的回答。我认为您是对的,但我不同意这是 Kafka 开发人员的合理选择。我不明白为什么 Kafka 无法配置为在短暂超时后完成初始重新平衡,然后,如果有其他消费者稍后加入,请再次重新平衡。
    • @monoi 实际上我认为在最新版本中你提到的东西已经发展起来。 (我还没有测试过,但我想它可以满足你对重新平衡的期望)。你可以查看thisthis
    【解决方案2】:

    我还没有解决这个问题(而且似乎没有解决方案),但我可能已经找到了一些改进的方法:将 group.instance.id 设置为 Kubernetes 主机名,并在 Kubernetes 中使用 StatefulSets,以便特定工作人员的主机名是稳定的。这样,当一个 worker 崩溃并重新加入时,希望 Kafka 能够将其识别为 同一个 worker,而不是等待幽灵。

    【讨论】:

      【解决方案3】:

      我的最终解决方案是迁移到 Apache Pulsar。

      Pulsar 允许单独确认消息,从而解决了问题。

      【讨论】:

        猜你喜欢
        • 2019-10-26
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2011-12-30
        • 2020-05-12
        • 2020-05-19
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多