【问题标题】:CommitFailedException: Commit cannot be completed since the group has already rebalancedCommitFailedException:由于组已经重新平衡,因此无法完成提交
【发布时间】:2018-07-04 09:53:39
【问题描述】:

环境:Hadoop2.75.+FLink1.4+Kafka0.10

我已经建立了一个实时数据处理项目。我使用 Flink Table 源 API (Kafka010JsonTableSource) 作为 tablaSource。从kafka获取数据,然后执行一条SQL,最后输出到一个kafka topic。流程很清晰,但是我在Flink集群上执行的时候遇到了异常,下面是我的主要代码:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
env.enableCheckpointing(5000) 
val tableEnv = TableEnvironment.getTableEnvironment(env)
val tableSource:KafkaTableSource = Kafka010JsonTableSource.builder()
    .forTopic(kafkaConfig.topic)
    .withKafkaProperties(props)
    .withSchema(dynamicJsonSchema)
    .withRowtimeAttribute(
         enventTimeFieldName,  
         new ExistingField(enventTimeFieldName),  
         new MyBoundedOutOfOrderTimestamps(100L)) 
    .build()
tableEnv.registerTableSource(tableName, tableSource)
val tableResult:Table = tableEnv.sqlQuery(sql)
tableResult.writeToSink(new Kafka010JsonTableSink(kafkaOutput.topic, props))

我已经启用了检查点。我第一次在 flink 上执行,我只是按照消费者的默认配置。 Flink任务运行后,我通过kafka shell命令(kafka-consumer-groups.sh)查看了offsets,发现了一个奇怪的情况。根据 Flink 任务管理器的 shell 命令输出和日志,我发现偏移量在开始的几秒钟内提交成功,但后来我继续遇到许多异常,如下所示:

块引用 2018-01-19 09:24:03,174 警告 org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - 向 Kafka 提交偏移量失败。这不会影响 Flink 的检查点。 org.apache.kafka.clients.consumer.CommitFailedException: 提交无法完成,因为该组已经重新平衡并将分区分配给另一个成员。这意味着后续调用 poll() 之间的时间比配置的 max.poll.interval.ms 长,这通常意味着轮询循环花费了太多时间处理消息。您可以通过增加会话超时或通过使用 max.poll.records 减少 poll() 中返回的批处理的最大大小来解决此问题。 在 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:792) 在 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:738) 在 org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) 在 org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) 在 org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) 在 org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) 在 org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) 在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488) 在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348) 在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) 在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) 在 org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096) 在 org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) 在 org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:247) 2018-01-19 09:24:03,174 警告 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - 异步 Kafka 提交失败。 org.apache.kafka.clients.consumer.CommitFailedException: 提交无法完成,因为该组已经重新平衡并将分区分配给另一个成员。这意味着后续调用 poll() 之间的时间比配置的 max.poll.interval.ms 长,这通常意味着轮询循环花费了太多时间处理消息。您可以通过增加会话超时或通过使用 max.poll.records 减少 poll() 中返回的批处理的最大大小来解决此问题。 在 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:792) 在 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:738) 在 org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) 在 org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) 在 org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) 在 org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) 在 org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) 在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488) 在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348) 在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) 在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) 在 org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096) 在 org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) 在 org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:247)

所以我根据上述错误信息搜索了解决方案。有人告诉我应该增加 session.timeout.ms ,然后我跟着它,但仍然失败。之后我尝试了以下多种组合配置进行测试,kafka偏移量一开始总是提交成功,但后来提交失败。实在不知道怎么解决,能不能帮我解决一下?非常感谢!!!!!!

kafka 消费者配置组合如下: { "propertyKey": "session.timeout.ms", "propertyValue": "300000" }, { "propertyKey": "request.timeout.ms", "propertyValue": "505000" }, { "propertyKey": "auto.commit.interval.ms", "propertyValue": "10000" }, { "propertyKey": "max.poll.records", "propertyValue": "50" }, { "propertyKey": "max.poll.interval.ms", "propertyValue": "500000" }, { "propertyKey": "client.id", "propertyValue": "taxi-client-001" }, { "propertyKey": "heartbeat.interval.ms", "propertyValue": "99000" } 我尝试将上述配置更改为各种值,但都失败了,即使我根据 kafka 官方文档指南配置它们。希望各位大神帮忙解决一下上面的错误,非常感谢!!!

【问题讨论】:

  • 您是否尝试将max.poll.interval.ms 增加到略高于每条记录的平均处理时间的某个值?
  • 是的,在我的配置中可以看到max.poll.interval.ms的值是500000,它的默认值是305000,我觉得500000就够了。你认为这个属性应该更大吗?
  • 另一种可能的情况是您有另一个正在运行的同名消费者组。你检查你指定的组 id 是唯一的吗?

标签: scala apache-kafka apache-flink


【解决方案1】:

我找到了根本原因。 总是发生rebalance错误的原因是两个消费者(一个是消费者输入数据,另一个是消费者输出数据)组名相同。我怀疑只有一个协调器没有足够的能力处理两个消费者的偏移提交动作。在我更改一个消费者的组名后,世界突然安静了。错误从未发生过。

【讨论】:

  • 你有什么更有效的解决方案吗?我只是更改了组名,问题就消失了。
  • 这取决于以下配置:session.timeout.ms, max.poll.records。详细检查这些参数以获取更多详细信息。或者请分享您的 kafka 配置。
猜你喜欢
  • 2017-02-06
  • 2019-01-08
  • 2018-01-15
  • 1970-01-01
  • 2018-01-17
  • 1970-01-01
  • 2018-04-25
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多