【问题标题】:Evaluate only the latest window for event time based sliding windows仅评估基于事件时间的滑动窗口的最新窗口
【发布时间】:2017-12-07 22:49:51
【问题描述】:

我想在 EvenTime 中使用滑动窗口处理事件。滑动间隔为 24 小时,增量为 30 分钟。问题是下面的代码为每个事件产生 48 次计算。在我们的案例中,事件是按顺序进行的,因此我们只需要评估最新的窗口。

谢谢,

德让

public static void processEventsa(
        DataStream<Tuple2<String, MyEvent>> events) throws Exception {

    events.assignTimestampsAndWatermarks(new MyWatermark()).
            keyBy(0).
            timeWindow(Time.hours(windowSizeHour), Time.seconds(windowSlideSeconds)).
            apply(new WindowFunction<Tuple2<String, MyEvent>, Tuple2<String, MyEvent>, Tuple, TimeWindow>() {
                @Override
                public void apply(Tuple key, TimeWindow window, Iterable<Tuple2<String, MyEvent>> input,
                                          Collector<Tuple2<String, MyEvent>> out) throws Exception {

                    for (Tuple2<String, MyEvent> record : input) {



                    }
                }
            });
}

public class MyWatermark implements
        AssignerWithPunctuatedWatermarks<Tuple2<String, MyEvent>> {

    @Override
    public long extractTimestamp(Tuple2<String, MyEvent> event, long previousElementTimestamp) {
        return event.f1.eventTime;
    }

    @Override
    public Watermark checkAndGetNextWatermark(Tuple2<String, MyEvent> event, long previousElementTimestamp) {
        return new Watermark(event.f1.eventTime);
    }
}

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    问题出在水印中。应该使用 AssignerWithPeriodicWatermarks

    public class MyWatermark implements
            AssignerWithPeriodicWatermarks<Tuple2<String, MyEvent>> {
    
        private final long maxTimeLag = 5000;
    
        @Override
        public long extractTimestamp(Tuple2<String, MyEvent> event, long previousElementTimestamp) {
            try {
                return event.f1.eventTime;
            }
            catch(NullPointerException ex) {}
    
            return System.currentTimeMillis() - maxTimeLag;
        }
    
        @Override
        public Watermark getCurrentWatermark() {
            return new Watermark(System.currentTimeMillis() - maxTimeLag);
        }
    }
    

    【讨论】:

      猜你喜欢
      • 2019-02-09
      • 2020-09-02
      • 1970-01-01
      • 2018-12-01
      • 1970-01-01
      • 2011-04-28
      • 2017-05-25
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多