【问题标题】:Flink how set up initial watermarkFlink 如何设置初始水印
【发布时间】:2018-07-12 14:00:02
【问题描述】:

我正在使用 Flink 1.3.2 和 scala 构建一个流式应用程序,我的 Flink 应用程序将监视一个文件夹并将新文件流式传输到管道中。文件中的每条记录都有一个相关的时间戳。我想将此时间戳用作事件时间并使用AssignerWithPeriodicWatermarks[T] 构建水印,我的水印生成器如下所示:

class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[Activity] {
    val maxTimeLag = 6 * 3600000L // 6 hours
    override def extractTimestamp(element: Activity, previousElementTimestamp: Long): Long = {
    val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssXXX")
        val timestampString = element.getTimestamp
    }
    override def getCurrentWatermark(): Watermark = {
      new Watermark(System.currentTimeMillis() - maxTimeLag)
    }
  }

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(10000L)
val stream = env.readFile(inputformart, path, FileProcessingMode.PROCESS_CONTINUOUSLY, 100)

val activity = stream
      .assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator())
      .map { line =>
        new tuple.Tuple2(line.id, line.count)
      }.keyBy(0).addSink(...)

但是,由于我的文件夹中有一些旧数据,我不想处理它们。并且旧文件中记录的时间戳> 6小时,应该比水印更早。但是,当我开始运行它时,我仍然可以看到创建了一些初始输出。我想知道水印的初始值是如何设置的,是在第一个间隔之前还是之后?可能是我在这里误解了一些东西,但需要一些建议。

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    在您展示的管道中没有任何操作员关心时间——没有窗口,没有 ProcessFunction 计时器——因此每个流元素都将畅通无阻地通过并被处理。如果您的目标是跳过迟到的元素,您需要引入一些东西(以某种方式)将事件时间戳与当前水印进行比较。

    您可以通过在 keyBy 和 sink 之间引入一个步骤来做到这一点,如下所示:

    ...
    .keyBy(0)
    .process(new DropLateEvents())
    .addSink(...)
    
    public static class DropLateEvents extends ProcessFunction<...> {
        @Override
        public void processElement(... event, Context context, Collector<...> out) throws Exception {
            TimerService timerService = context.timerService();
            if (context.timestamp() > timerService.currentWatermark()) {
               out.collect(event);
            }
        }
    }
    

    完成此操作后,您关于初始水印的问题就变得相关了。对于周期性水印,初始水印为 Long.MIN_VALUE,因此在发出第一个水印之前不会被视为迟到,这将在操作 10 秒后发生(考虑到您如何设置自动水印间隔)。

    如果您想更详细地了解周期性水印是如何生成的,相关代码是here

    如果您想避免在前 10 秒内处理迟到的元素,您可以完全忘记使用事件时间和水印,只需修改上面显示的 processElement 方法,将事件时间戳与System.currentTimeMillis() - maxTimeLag 进行比较,而不是与当前水印。另一种解决方案是使用标点水印,并在第一个事件中发出水印。

    或者更简单地说,您可以在 flatMap 或过滤器中检测并删除延迟事件,因为您定义的是相对于 System.currentTimeMillis() 而不是水印的延迟。

    【讨论】:

    • 这很全面,感谢@alpinegizmo,我对此很陌生。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多