【问题标题】:kafka consumer client in java can't reconnect to kubernetes kafka brokers after all of kafka-pods are upgraded升级所有 kafka-pod 后,java 中的 kafka 消费者客户端无法重新连接到 kubernetes kafka 代理
【发布时间】:2020-06-11 09:16:56
【问题描述】:

我使用 spring-kafka(2.2.4.RELEASE) 来使用来自 kafka-server 的消息。 kafka 客户端和服务器都部署在 k8s 集群中。 通常,在 kafka 代理上生产和消费消息是可以的。 但是当 kafka-brokers 升级时,kafka 客户端无法重新连接到 broker。

据我所知,当bootstrap-servers 是虚拟IP(detail is here)时,kafka 客户端重新连接存在错误。我的问题和vip bug一样。

在我的情况下,bootstrap-servers地址是k8s kafka服务名:port,当kafka-brokers升级时,对应的真实ip kafka 服务名称会改变。 所以kafka客户端永远不会重新连接成功。 我该如何解决这个问题?

环境

  • kubectl 版本
Client Version: version.Info{Major:"1", Minor:"9", GitVersion:"v1.9.10", GitCommit:"098570796b32895c38a9a1c9286425fb1ececa18", GitTreeState:"clean", BuildDate:"2018-08-02T17:19:54Z", GoVersion:"go1.9.3", Compiler:"gc", Platform:"linux/amd64"}
Server Version: version.Info{Major:"1", Minor:"9", GitVersion:"v1.9.10", GitCommit:"098570796b32895c38a9a1c9286425fb1ececa18", GitTreeState:"clean", BuildDate:"2018-08-02T17:11:51Z", GoVersion:"go1.9.3", Compiler:"gc", Platform:"linux/amd64"}
  • kafka 版本:kafka_2.12-2.3.1
  • kafka 部署信息:
> kubectl get svc -o wide -nbingotestdev|grep kafkadev
kafkadev                ClusterIP   None            <none>        9091/TCP                          1y        app=kafkadev
kafkadev-out            NodePort    10.68.206.93    <none>        9091:37142/TCP                    257d      app=kafkadev

> kubectl get pod -o wide -nbingotestdev|grep kafkadev
kafkadev-0                               1/1       Running             0          15h       172.20.10.59    10.171.113.45
kafkadev-1                               1/1       Running             0          15h       172.20.13.95    10.171.113.33
kafkadev-2                               1/1       Running             0          15h       172.20.2.173    10.171.113.62

  • kafka客户端配置:
    • 版本1:bootstrap-servers = kafkadev:9091
    • 版本2:bootstrap-servers = 10.68.206.93:9091
    • 当 kafka 服务器正常时,两者都能成功运行,而在 kafka 服务器 pod 升级后重新连接失败。

【问题讨论】:

  • 您如何准确地将客户端指向服务器?您使用无头服务还是其他服务?你的设置是什么?
  • application.propterties 中的@Nick 配置如下:bootstrap.servers=kafkadev:9091,并将传递给 DefaultKafkaConsumerFactory()

标签: java kubernetes apache-kafka kafka-consumer-api


【解决方案1】:

您必须确保您始终拥有一个静态分配的 IP 集,当消费者获取引导服务器时,无论是通过外部 DNS 服务还是使用 k8s api 客户端直接检查运行 Kafka 服务,然后获取所有地址以构建您的引导服务器字符串

【讨论】:

  • 如何获取a statically assigned IP set?据我所知,k8s 生成的 ip 是短暂的,并且在 pod 升级后总是会改变,尤其是对于有状态集。
  • 静态我主要是指经纪人 ID 不会改变。您是否查看过 Confluent Helm Charts 或 Strimzi 如何将经纪人展示给客户?
  • 我们两者都没有使用。我们从kafka_2.12-2.3.1 构建 kafka 映像,并将 kafka 服务作为有状态集。
  • 为什么要自己建造?我很确定 Strimzi Kafka 操作员支持 statefulset 部署
  • emm,我们想在安装的时候加一些acl。我想我无法从kafakdev 服务名称中获得a statically assigned IP set,即使我使用Strimzi Kafka。我认为关键是stateful deployments 为服务名称生成临时 ip。
猜你喜欢
  • 2022-07-07
  • 2020-04-02
  • 2014-06-22
  • 2016-08-31
  • 2019-03-14
  • 2017-08-07
  • 2015-09-18
  • 2017-04-03
  • 1970-01-01
相关资源
最近更新 更多