【问题标题】:Compute average value with Flink and Event-Time-Based stream使用 Flink 和 Event-Time-Based 流计算平均值
【发布时间】:2018-05-16 21:59:57
【问题描述】:

我想在 Flink 中使用基于历史事件的流计算基于窗口的平均值(或我定义的任何其他函数),因此流必须是事件时间(不是基于处理时间):

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

我发现了如何在摄取时添加时间戳:

ctx.collectWithTimestamp(Datapoint(instrument, bid, ask), time.getMillis)

但是当我进行计算(应用函数)时,当我以与没有 EventTime 时相同的方式进行计算时,它不起作用。我已经阅读了一些关于我必须设置的水印的内容:

val avg = stream
  .keyBy("instrument")
  .timeWindow(Time.seconds(10))
  .apply((key: Tuple, window: TimeWindow, values: Iterable[Datapoint], out: Collector[Datapoint])=>{
    val avg = values.map(_.val).sum / values.size
    val dp = Datapoint(key.getField[String](0), avg)
    out.collect(dp)
  })

avg.print()
env.execute()

有人有一个简单的 Scala 示例吗?

问候,
安德烈亚斯

【问题讨论】:

    标签: scala apache-flink flink-streaming


    【解决方案1】:

    水印实际上是一个时间戳,它断言所有具有较早时间戳的事件已经(可能)已经到达。基于事件时间的窗口依赖于水印来知道窗口何时完成。到目前为止,最常见的水印策略是假设事件到达有一定的延迟。

    如果您想在数据源中发出水印(在摄取期间),请参阅Source Functions with Timestamps and Watermarks,但它很简单

    ctx.emitWatermark(new Watermark(datapoint.getWatermarkTime))
    

    另一方面,如果您想在源之外处理此问题,请参阅 Timestamp Assigners / Watermark GeneratorsAssigners allowing a fixed amount of lateness。你可以简单地做这样的事情:

    stream
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Datapoint](Time.seconds(10))( _.getTimestamp ))
      .keyBy("instrument")
      ...
    

    我链接到的文档在 Scala 中有更详细的示例。

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-03-01
    • 1970-01-01
    • 1970-01-01
    • 2013-01-14
    • 1970-01-01
    • 2013-02-19
    • 1970-01-01
    相关资源
    最近更新 更多