【问题标题】:Kafka Connect: Wasn't unable to resume work after last rebalanceKafka Connect:上次重新平衡后无法恢复工作
【发布时间】:2020-09-02 18:14:19
【问题描述】:

我有 Kafka Connect(版本 2.1.1-cp1)与 Kafka(2.0.1-cp4)一起工作,并且运行着大约 70 个连接器,工作量非常大。有时(每 2-3 周),一些节点突然开始获取以下日志并停止工作:

INFO [Worker clientId=connect-1, groupId=dwh-prod] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
INFO [Worker clientId=connect-1, groupId=dwh-prod] Successfully joined group with generation 2288 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
INFO Joined group and got assignment: Assignment{error=0, leader='connect-1-8532b028-281c-4aca-8440-c4c999812158', leaderUrl='http://10.36.3.136:8083/', offset=2811, connectorIds=[<redacted>]} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
WARN Catching up to assignment's config offset. (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
INFO Current config state offset -1 is behind group assignment 2811, reading to end of config log (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
INFO Finished reading to end of log and updated config snapshot, new config log offset: -1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
INFO Current config state offset -1 does not match group assignment 2811. Forcing rebalance. (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
INFO Rebalance started (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
INFO Wasn't unable to resume work after last rebalance, can skip stopping connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

我可以看到从 configBackingStore 获取 configState(偏移量)可能有问题,不知道为什么https://github.com/apache/kafka/blob/2.1.1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L823

看起来它从产生新节点开始(它通过垂直 pod 自动缩放器托管在 Kubernetes 中)并无限期地持续数小时。

该解决方案适用于带有confluentinc/cp-kafka-connect的K8S

 - name: CONNECT_LOG4J_ROOT_LOGLEVEL
   value: INFO
 - name: CONNECT_BOOTSTRAP_SERVERS
   value:***:9092
 - name: CONNECT_ZOOKEEPER_CONNECT
   value: ***:2181
 - name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
   value: http://***:8081
 - name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
   value: http://***:8081
 - name: CONNECT_GROUP_ID
   value: dwh-prod
 - name: CONNECT_STATUS_STORAGE_TOPIC
   value: dwh-prod-status
 - name: CONNECT_CONFIG_STORAGE_TOPIC
   value: dwh-prod-configs
 - name: CONNECT_OFFSET_STORAGE_TOPIC
   value: dwh-prod-offsets
 - name: CONNECT_OFFSET_FLUSH_INTERVAL_MS
   value: "10000"
 - name: CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR
   value: "1"
 - name: CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR
   value: "1"
 - name: CONNECT_STATUS_STORAGE_REPLICATION_FACTOR
   value: "1"
 - name: CONNECT_REST_ADVERTISED_HOST_NAME
   valueFrom:
     fieldRef:
       fieldPath: status.podIP
 - name: CONNECT_REST_PORT
   value: "8083"
 - name: CONNECT_PLUGIN_PATH
   value: /usr/share/java,/usr/share/confluent-hub-components,/usr/share/landoop-plugins
 - name: CONNECT_INTERNAL_KEY_CONVERTER
   value: org.apache.kafka.connect.json.JsonConverter
 - name: CONNECT_INTERNAL_VALUE_CONVERTER
   value: org.apache.kafka.connect.json.JsonConverter
 - name: CONNECT_KEY_CONVERTER
   value: io.confluent.connect.avro.AvroConverter
 - name: CONNECT_VALUE_CONVERTER
  value: io.confluent.connect.avro.AvroConverter
- name: CONNECT_LOG4J_LOGGERS
  value: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
- name: CLASSPATH
  value: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.0.0.jar

有什么想法或提示吗?

【问题讨论】:

  • 您能分享一下您开始工作时使用的worker.properties 吗?您是手动创建偏移主题还是依赖 Connect 来创建它?您可能正在使用错误的压缩策略创建内部 Connect 主题。
  • 我在 K8S 上使用confluentinc/cp-kafka-connect 图像运行它。我已经在上面粘贴了配置,希望能回答你的问题。
  • 你有多少个 kafka 经纪人?
  • 有3个经纪人

标签: apache-kafka apache-kafka-connect


【解决方案1】:

在分布式模式下uses KafkaConfigBackingStore 作为ConfigBackingStore 的实现。 KafkaConfigBackingStore 在 Kafka 主题中提供 Kafka Connect 连接器配置的持久存储。

根据docs

对于生产系统,此主题的复制因子应始终至少为 3,但不能大于集群中 Kafka 代理的数量。

我不确定,但这种情况可能会导致您的问题:具有单个配置数据副本的代理由于某种原因(如 K8S 行为)变得不可用,因此 Kafka Connect 无法获取当前配置状态。

请尝试增加config.storage.topic 的复制因子(从 1 到 3)。还要检查其他配置主题的复制因子(offset.storage.replication.factorstatus.storage.replication.factor)。

【讨论】:

  • 您确定我可以在集群在线时执行此操作吗?据我回忆,我过去曾尝试过,它导致 Kafka Connect 无法启动。但我会在测试环境中尝试。
猜你喜欢
  • 2018-01-20
  • 2021-10-19
  • 1970-01-01
  • 2017-05-17
  • 2020-08-24
  • 1970-01-01
  • 1970-01-01
  • 2020-08-04
  • 1970-01-01
相关资源
最近更新 更多