【问题标题】:Reassigning timestamps, watermarks in Flink?在 Flink 中重新分配时间戳、水印?
【发布时间】:2019-02-09 06:45:06
【问题描述】:

考虑我这样做:

DataStream<POJO> ds = ...
ds.assignTimestampsAndWatermarks(CustomAssigner)
.windowAll(...)
.apply(someFunction) //THIS FUNCTION CHANGES THE TIMESTAMP FIELD IN THE EVENTS
.assignTimestampsAndWatermarks(AnotherCustomAssigner)

这有效吗?我不知道水印/时间戳是全局的还是仅保留在数据流中?

编辑

class POJO{
   int timestamp;
   String someDetail; //key by this
   ...
}

数据流 ds = ....

ds.assignTimeStampsAndWatermarks(new AssignerWithPunctuatedWatermarks(){
   long maxTS = Long.MIN_VALUE;
   Watermarks checkAndGetNextWater(POJO, p, long l){
  maxTS = max(...)
  return new Watermarks(maxTS);
}

long ExtractTS(POJO p, long l){
  maxTS = max(...)
  return p.timeStamp;
}


  }).keyBy(someDetail property)
     .window(TumblingWindow(1 min))
      .apply(new AllWindowFunction<POJO, POJO, String, TimeWindow>(){
  public void apply(...){
    POJO newPOJO = ...;
    for(POJO it : iterable){
      newPOJO.timeStamp += ...
    }
    collector.collect(newPOJO);
  }
}) 

现在我想知道

如果我应该再次分配时间戳,因为我想做windowAll,然后再次apply

assignTimestamp...
.windowAll(..)
.apply(some other allwindow function)

【问题讨论】:

  • 谁能解释一下什么是对象重用?我应该使用apply 还是别的什么?

标签: java apache-flink


【解决方案1】:

您不应再次调用assignTimestampsAndWatermarks。 Flink 将忽略由 WindowFunction 创建的 POJO 中的时间戳,并将使用从该窗口结束时的时间派生的时间戳为包装这些事件的流记录添加时间戳。通常这可以正常工作,尽管后续窗口需要覆盖一个时间范围,该时间范围是第一个时间范围的整数倍。

如果您尝试构建一个全新的流,该流应该重新加时间戳并拥有自己的新水印,那么再次调用 assignTimestampsAndWatermarks 可能会奏效。

【讨论】:

  • .apply(someFunction) 修改从窗口传入的事件的时间戳。例如,在我的窗口中,我可能有时间戳为12, 13, 14, 52, 19 的事件,.apply(someFunction) 返回的聚合对象的时间戳为12,如果状态/对象/时间戳已经修改的?谢谢
  • 你能分享你正在使用的代码吗?我想看看你使用的是 TimeWindows 还是其他类型,WindowFunction 是如何构造它收集的对象的,你是否启用了对象重用,以及其他详细信息。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多