【发布时间】:2018-07-12 14:00:02
【问题描述】:
我正在使用 Flink 1.3.2 和 scala 构建一个流式应用程序,我的 Flink 应用程序将监视一个文件夹并将新文件流式传输到管道中。文件中的每条记录都有一个相关的时间戳。我想将此时间戳用作事件时间并使用AssignerWithPeriodicWatermarks[T] 构建水印,我的水印生成器如下所示:
class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[Activity] {
val maxTimeLag = 6 * 3600000L // 6 hours
override def extractTimestamp(element: Activity, previousElementTimestamp: Long): Long = {
val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssXXX")
val timestampString = element.getTimestamp
}
override def getCurrentWatermark(): Watermark = {
new Watermark(System.currentTimeMillis() - maxTimeLag)
}
}
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(10000L)
val stream = env.readFile(inputformart, path, FileProcessingMode.PROCESS_CONTINUOUSLY, 100)
val activity = stream
.assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator())
.map { line =>
new tuple.Tuple2(line.id, line.count)
}.keyBy(0).addSink(...)
但是,由于我的文件夹中有一些旧数据,我不想处理它们。并且旧文件中记录的时间戳> 6小时,应该比水印更早。但是,当我开始运行它时,我仍然可以看到创建了一些初始输出。我想知道水印的初始值是如何设置的,是在第一个间隔之前还是之后?可能是我在这里误解了一些东西,但需要一些建议。
【问题讨论】:
标签: apache-flink flink-streaming