【发布时间】:2021-12-12 04:42:54
【问题描述】:
我有一个使用以下设置运行 FlinkSQL 的 Flink 作业:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
env.setMaxParallelism(env.getParallelism() * 8);
env.getConfig().setAutoWatermarkInterval(config.autowatermarkInterval());
final TableConfig tConfig = tEnv.getConfig();
tConfig.setIdleStateRetention(Duration.ofMinutes(60));
tConfig.getConfiguration().setString("table.exec.source.idle-timeout", "180000 ms");
为了使用 Kafka 源在本地进行测试,我向 Flink 作业触发了一些事件。 Flink UI 显示它产生了一个水印。我等了 3 分钟,看看水印是否在没有发送新事件(即空闲分区)的情况下前进。但是,没有发生水印提升。
注意:我在本地使用具有三个分区的 Kafka 代理。我的测试数据是键控的,因此被发送到同一个分区。但是,即使其他分区空闲并且我等待 3 分钟,我也看不到水印前进。
-
JOB UI 中的任何地方我都可以看到我设置的 3 分钟的值是否真的被拾取?我是否使用了正确的单位(秒 vs 毫秒)
-
还有什么我可以检查来测试这个设置的吗?
我们正在运行 Flink 1.12.1。
更新:我在异常下的 Flink SQL 作业中看到了这个异常:不知道是否存在版本不匹配。
2021-10-26 16:38:14
java.lang.NoClassDefFoundError: org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest$PartitionData
at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.lambda$null$0(OffsetsForLeaderEpochClient.java:52)
at java.base/java.util.Optional.ifPresent(Unknown Source)
at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.lambda$prepareRequest$1(OffsetsForLeaderEpochClient.java:51)
at java.base/java.util.HashMap.forEach(Unknown Source)
at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.prepareRequest(OffsetsForLeaderEpochClient.java:51)
at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.prepareRequest(OffsetsForLeaderEpochClient.java:37)
at org.apache.kafka.clients.consumer.internals.AsyncClient.sendAsyncRequest(AsyncClient.java:37)
at org.apache.kafka.clients.consumer.internals.Fetcher.lambda$validateOffsetsAsync$5(Fetcher.java:798)
at java.base/java.util.HashMap.forEach(Unknown Source)
at org.apache.kafka.clients.consumer.internals.Fetcher.validateOffsetsAsync(Fetcher.java:774)
at org.apache.kafka.clients.consumer.internals.Fetcher.validateOffsetsIfNeeded(Fetcher.java:498)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2328)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1271)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:249)
【问题讨论】:
标签: apache-kafka apache-flink flink-sql