【问题标题】:flink 1.12.2 all events getting dropped as lateflink 1.12.2 所有事件都被丢弃
【发布时间】:2021-06-22 11:27:31
【问题描述】:

我的 flink 管道如下所示

FlinkKafkaConsumerBase kafkaConsumer = new FlinkKafkaConsumer<>(topic, new DeserializationSchema(),props);

kafkaSource = env.addSource(kafkaConsumer).filter(<>);
WatermarkStrategy<GenericMetricV2> watermarkStrategy = WatermarkStrategy
                .<GenericMetricV2>forBoundedOutOfOrderness(Duration.ofSeconds(900))
                .withTimestampAssigner((metric, timestamp) -> {
                    logger.info("ETS: mts: {}, ts: {}", metric.metricPoint.timeInstant, timestamp);
                    return metric.metricPoint.timeInstant;
                });

metricStream = kafkasource
                        .process(<>)
                        .assignTimestampsAndWatermarks(watermarkStrategy)
                        .transform("debugFilter", TypeInformation.of(<>), new StreamWatermarkDebugFilter<>("Op"))
                        .filter(<>)
                        .map(<>)
                        .flatMap(<>)
                        .keyBy(<>)
                        .window(TumblingEventTimeWindows.of(Time.seconds(300)))
                        .allowedLateneess(Time.seconds(900))
                        .sideOutputLateData(lateOutputTag)
                        .aggregate(AggregateFunction, ProcessWindowFunction)
                        .addSink()

我以并行度 1 运行,默认 setAutowatermarkInterval 为 200 毫秒。我没有设置 setStreamTimeCharacteristic 从 flink 1.12 默认是事件时间。

我看到 StreamWatermarkDebugFilter 的输出正在处理水印,但所有事件都被标记为迟到,并且正在收集到 lateOutputTag

2021-05-18 17:14:19,745 INFO                  - ETS: mts: 1621310100000, ts: 1621310582271
2021-05-18 17:14:19,745 INFO                  - ETS: mts: 1621310100000, ts: 1621310582271
2021-05-18 17:14:19,842 INFO  StreamWatermarkDebugFilter         - Op, Watermark: 1621309499999
2021-05-18 17:14:19,944 INFO                  - ETS: mts: 1621309800000, ts: 1621310582275
2021-05-18 17:14:19,944 INFO                  - ETS: mts: 1621309800000, ts: 1621310582275
...
2021-05-18 17:14:20,107 INFO                  - ETS: mts: 1621310380000, ts: 1621310582278
2021-05-18 17:14:20,107 INFO                  - ETS: mts: 1621310380000, ts: 1621310582278
2021-05-18 17:14:20,137 INFO  StreamWatermarkDebugFilter         - Op, Watermark: 1621309779999
2021-05-18 17:14:20,203 INFO                  - ETS: mts: 1621309800000, ts: 1621310582279
...
2021-05-18 17:17:47,839 INFO                  - ETS: mts: 1621310100000, ts: 1621310681159
2021-05-18 17:17:47,848 INFO  StreamWatermarkDebugFilter         - Op, Watermark: 1621310099999
2021-05-18 17:17:47,958 INFO                  - ETS: mts: 1621309800000, ts: 1621310681237
2021-05-18 17:17:47,958 INFO                  - ETS: mts: 1621309800000, ts: 1621310681237
...
2021-05-18 17:22:24,207 INFO                  - ETS: mts: 1621310100000, ts: 1621310703622
2021-05-18 17:22:24,229 INFO  StreamWatermarkDebugFilter         - Op, Watermark: 1621310399999
2021-05-18 17:22:24,315 INFO                  - ETS: mts: 1621309800000, ts: 1621310705177
2021-05-18 17:22:24,315 INFO                  - ETS: mts: 1621309800000, ts: 1621310705177

我看过这个discussion,这不是闲置问题。

看起来与此discussion 有关。有人可以建议我如何进一步调试此问题以确定可能是什么问题?

【问题讨论】:

  • 对于每一个迟到的事件,我认为在您尚未共享的管道部分中一定有一些奇怪的事情发生。为了调试它,我会将 debugFilter 移到窗口之前,并让它打印出所有事件和水印的时间戳。
  • 我会的。一个查询,因为我不是在 Kafka 源而是在 kafkasource -&gt; process 之后添加水印策略。 process 被调用之前的初始水印是什么?会是kafka记录时间戳吗?如果是,我可以在从 kafka 读取历史数据时禁用 kafka 源的水印生成吗?
  • 一般更喜欢在kafka源码中应用你的水印策略;目前尚不清楚您为什么要避免这种情况(它是历史数据并不重要)。正如您现在拥有的那样,管道的早期部分(在水印生成器之前)没有水印,并且 flink 的 StreamRecord 时间戳最初是 kafka 标头时间戳,然后被您的时间戳分配器覆盖。 kafka 提供的时间戳是在你的时间戳分配器中打印的时间戳。
  • 是的,这不是由于历史数据。我无法将水印策略直接放在 kafka 源中,因为我需要过滤一些数据,然后在 process() 中进行一些处理以获取所需格式的数据,然后才能放置策略。
  • 这是我没有分享的代码部分的问题。我在assignTimestampsAndWatermarks 之后做了filter(),所以我不感兴趣的倾斜数据正在推动水印前进。我在assignTimestampsAndWatermarks 之前移动了filter(),它按预期工作。感谢您建议将 debugFilter 移到有助于识别此问题的窗口之前。

标签: apache-flink flink-streaming


【解决方案1】:

这是我没有分享的部分代码的问题。我在assignTimestampsAndWatermarks() 之后做了一个filter(),所以我不感兴趣的倾斜数据正在推动水印前进。我在assignTimestampsAndWatermarks 之前移动了filter(),它按预期工作。

【讨论】:

    猜你喜欢
    • 2015-08-31
    • 1970-01-01
    • 2022-11-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多