【发布时间】:2020-09-02 18:14:19
【问题描述】:
我有 Kafka Connect(版本 2.1.1-cp1)与 Kafka(2.0.1-cp4)一起工作,并且运行着大约 70 个连接器,工作量非常大。有时(每 2-3 周),一些节点突然开始获取以下日志并停止工作:
INFO [Worker clientId=connect-1, groupId=dwh-prod] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
INFO [Worker clientId=connect-1, groupId=dwh-prod] Successfully joined group with generation 2288 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
INFO Joined group and got assignment: Assignment{error=0, leader='connect-1-8532b028-281c-4aca-8440-c4c999812158', leaderUrl='http://10.36.3.136:8083/', offset=2811, connectorIds=[<redacted>]} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
WARN Catching up to assignment's config offset. (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
INFO Current config state offset -1 is behind group assignment 2811, reading to end of config log (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
INFO Finished reading to end of log and updated config snapshot, new config log offset: -1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
INFO Current config state offset -1 does not match group assignment 2811. Forcing rebalance. (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
INFO Rebalance started (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
INFO Wasn't unable to resume work after last rebalance, can skip stopping connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
我可以看到从 configBackingStore 获取 configState(偏移量)可能有问题,不知道为什么https://github.com/apache/kafka/blob/2.1.1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L823
看起来它从产生新节点开始(它通过垂直 pod 自动缩放器托管在 Kubernetes 中)并无限期地持续数小时。
该解决方案适用于带有confluentinc/cp-kafka-connect的K8S
- name: CONNECT_LOG4J_ROOT_LOGLEVEL
value: INFO
- name: CONNECT_BOOTSTRAP_SERVERS
value:***:9092
- name: CONNECT_ZOOKEEPER_CONNECT
value: ***:2181
- name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://***:8081
- name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://***:8081
- name: CONNECT_GROUP_ID
value: dwh-prod
- name: CONNECT_STATUS_STORAGE_TOPIC
value: dwh-prod-status
- name: CONNECT_CONFIG_STORAGE_TOPIC
value: dwh-prod-configs
- name: CONNECT_OFFSET_STORAGE_TOPIC
value: dwh-prod-offsets
- name: CONNECT_OFFSET_FLUSH_INTERVAL_MS
value: "10000"
- name: CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR
value: "1"
- name: CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR
value: "1"
- name: CONNECT_STATUS_STORAGE_REPLICATION_FACTOR
value: "1"
- name: CONNECT_REST_ADVERTISED_HOST_NAME
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: CONNECT_REST_PORT
value: "8083"
- name: CONNECT_PLUGIN_PATH
value: /usr/share/java,/usr/share/confluent-hub-components,/usr/share/landoop-plugins
- name: CONNECT_INTERNAL_KEY_CONVERTER
value: org.apache.kafka.connect.json.JsonConverter
- name: CONNECT_INTERNAL_VALUE_CONVERTER
value: org.apache.kafka.connect.json.JsonConverter
- name: CONNECT_KEY_CONVERTER
value: io.confluent.connect.avro.AvroConverter
- name: CONNECT_VALUE_CONVERTER
value: io.confluent.connect.avro.AvroConverter
- name: CONNECT_LOG4J_LOGGERS
value: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
- name: CLASSPATH
value: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.0.0.jar
有什么想法或提示吗?
【问题讨论】:
-
您能分享一下您开始工作时使用的
worker.properties吗?您是手动创建偏移主题还是依赖 Connect 来创建它?您可能正在使用错误的压缩策略创建内部 Connect 主题。 -
我在 K8S 上使用
confluentinc/cp-kafka-connect图像运行它。我已经在上面粘贴了配置,希望能回答你的问题。 -
你有多少个 kafka 经纪人?
-
有3个经纪人
标签: apache-kafka apache-kafka-connect