【发布时间】:2017-12-07 22:49:51
【问题描述】:
我想在 EvenTime 中使用滑动窗口处理事件。滑动间隔为 24 小时,增量为 30 分钟。问题是下面的代码为每个事件产生 48 次计算。在我们的案例中,事件是按顺序进行的,因此我们只需要评估最新的窗口。
谢谢,
德让
public static void processEventsa(
DataStream<Tuple2<String, MyEvent>> events) throws Exception {
events.assignTimestampsAndWatermarks(new MyWatermark()).
keyBy(0).
timeWindow(Time.hours(windowSizeHour), Time.seconds(windowSlideSeconds)).
apply(new WindowFunction<Tuple2<String, MyEvent>, Tuple2<String, MyEvent>, Tuple, TimeWindow>() {
@Override
public void apply(Tuple key, TimeWindow window, Iterable<Tuple2<String, MyEvent>> input,
Collector<Tuple2<String, MyEvent>> out) throws Exception {
for (Tuple2<String, MyEvent> record : input) {
}
}
});
}
public class MyWatermark implements
AssignerWithPunctuatedWatermarks<Tuple2<String, MyEvent>> {
@Override
public long extractTimestamp(Tuple2<String, MyEvent> event, long previousElementTimestamp) {
return event.f1.eventTime;
}
@Override
public Watermark checkAndGetNextWatermark(Tuple2<String, MyEvent> event, long previousElementTimestamp) {
return new Watermark(event.f1.eventTime);
}
}
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
【问题讨论】:
标签: apache-flink flink-streaming