【发布时间】:2018-07-04 09:53:39
【问题描述】:
环境:Hadoop2.75.+FLink1.4+Kafka0.10
我已经建立了一个实时数据处理项目。我使用 Flink Table 源 API (Kafka010JsonTableSource) 作为 tablaSource。从kafka获取数据,然后执行一条SQL,最后输出到一个kafka topic。流程很清晰,但是我在Flink集群上执行的时候遇到了异常,下面是我的主要代码:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
env.enableCheckpointing(5000)
val tableEnv = TableEnvironment.getTableEnvironment(env)
val tableSource:KafkaTableSource = Kafka010JsonTableSource.builder()
.forTopic(kafkaConfig.topic)
.withKafkaProperties(props)
.withSchema(dynamicJsonSchema)
.withRowtimeAttribute(
enventTimeFieldName,
new ExistingField(enventTimeFieldName),
new MyBoundedOutOfOrderTimestamps(100L))
.build()
tableEnv.registerTableSource(tableName, tableSource)
val tableResult:Table = tableEnv.sqlQuery(sql)
tableResult.writeToSink(new Kafka010JsonTableSink(kafkaOutput.topic, props))
我已经启用了检查点。我第一次在 flink 上执行,我只是按照消费者的默认配置。 Flink任务运行后,我通过kafka shell命令(kafka-consumer-groups.sh)查看了offsets,发现了一个奇怪的情况。根据 Flink 任务管理器的 shell 命令输出和日志,我发现偏移量在开始的几秒钟内提交成功,但后来我继续遇到许多异常,如下所示:
块引用 2018-01-19 09:24:03,174 警告 org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - 向 Kafka 提交偏移量失败。这不会影响 Flink 的检查点。 org.apache.kafka.clients.consumer.CommitFailedException: 提交无法完成,因为该组已经重新平衡并将分区分配给另一个成员。这意味着后续调用 poll() 之间的时间比配置的 max.poll.interval.ms 长,这通常意味着轮询循环花费了太多时间处理消息。您可以通过增加会话超时或通过使用 max.poll.records 减少 poll() 中返回的批处理的最大大小来解决此问题。 在 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:792) 在 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:738) 在 org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) 在 org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) 在 org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) 在 org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) 在 org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) 在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488) 在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348) 在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) 在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) 在 org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096) 在 org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) 在 org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:247) 2018-01-19 09:24:03,174 警告 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - 异步 Kafka 提交失败。 org.apache.kafka.clients.consumer.CommitFailedException: 提交无法完成,因为该组已经重新平衡并将分区分配给另一个成员。这意味着后续调用 poll() 之间的时间比配置的 max.poll.interval.ms 长,这通常意味着轮询循环花费了太多时间处理消息。您可以通过增加会话超时或通过使用 max.poll.records 减少 poll() 中返回的批处理的最大大小来解决此问题。 在 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:792) 在 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:738) 在 org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) 在 org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) 在 org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) 在 org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) 在 org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) 在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488) 在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348) 在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) 在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) 在 org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096) 在 org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) 在 org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:247)
所以我根据上述错误信息搜索了解决方案。有人告诉我应该增加 session.timeout.ms ,然后我跟着它,但仍然失败。之后我尝试了以下多种组合配置进行测试,kafka偏移量一开始总是提交成功,但后来提交失败。实在不知道怎么解决,能不能帮我解决一下?非常感谢!!!!!!
kafka 消费者配置组合如下:
{
"propertyKey": "session.timeout.ms",
"propertyValue": "300000"
},
{
"propertyKey": "request.timeout.ms",
"propertyValue": "505000"
},
{
"propertyKey": "auto.commit.interval.ms",
"propertyValue": "10000"
},
{
"propertyKey": "max.poll.records",
"propertyValue": "50"
},
{
"propertyKey": "max.poll.interval.ms",
"propertyValue": "500000"
},
{
"propertyKey": "client.id",
"propertyValue": "taxi-client-001"
},
{
"propertyKey": "heartbeat.interval.ms",
"propertyValue": "99000"
}
我尝试将上述配置更改为各种值,但都失败了,即使我根据 kafka 官方文档指南配置它们。希望各位大神帮忙解决一下上面的错误,非常感谢!!!
【问题讨论】:
-
您是否尝试将
max.poll.interval.ms增加到略高于每条记录的平均处理时间的某个值? -
是的,在我的配置中可以看到
max.poll.interval.ms的值是500000,它的默认值是305000,我觉得500000就够了。你认为这个属性应该更大吗? -
另一种可能的情况是您有另一个正在运行的同名消费者组。你检查你指定的组 id 是唯一的吗?
标签: scala apache-kafka apache-flink