【发布时间】:2018-05-16 21:59:57
【问题描述】:
我想在 Flink 中使用基于历史事件的流计算基于窗口的平均值(或我定义的任何其他函数),因此流必须是事件时间(不是基于处理时间):
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
我发现了如何在摄取时添加时间戳:
ctx.collectWithTimestamp(Datapoint(instrument, bid, ask), time.getMillis)
但是当我进行计算(应用函数)时,当我以与没有 EventTime 时相同的方式进行计算时,它不起作用。我已经阅读了一些关于我必须设置的水印的内容:
val avg = stream
.keyBy("instrument")
.timeWindow(Time.seconds(10))
.apply((key: Tuple, window: TimeWindow, values: Iterable[Datapoint], out: Collector[Datapoint])=>{
val avg = values.map(_.val).sum / values.size
val dp = Datapoint(key.getField[String](0), avg)
out.collect(dp)
})
avg.print()
env.execute()
有人有一个简单的 Scala 示例吗?
问候,
安德烈亚斯
【问题讨论】:
标签: scala apache-flink flink-streaming