一、Window类型
1、时间窗口(TimeWindow)
(1)滚动时间窗口(Tumbling Window)
将数据依据固定的窗口长度对数据进行切分
特点:时间对齐,窗口长度固定,没有重叠
(2)滑动时间窗口(Sliding Window)
滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成
特点:窗口长度固定,可以有重叠
(3)会话窗口(Session Window)
由一系列事件组合一个指定事件长度的timeout间隔组成,就也就是一段时间没有收到新数据就会生成新的窗口。
特点:时间不对齐
2、计数窗口(CountWindow)
(1)滚动计数窗口
(2)滑动计数窗口
二、Window API
Flink提供了两种窗口API
1、基于DataStream的windowAll
public <W extends Window> AllWindowedStream<T, W> windowAll(WindowAssigner<? super T, W> assigner) {
return new AllWindowedStream(this, assigner);
}
也可以使用实现了WindowAssigner的timeWindowAll和countWindowAll
public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size) {
return this.environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime ? this.windowAll(TumblingProcessingTimeWindows.of(size)) : this.windowAll(TumblingEventTimeWindows.of(size));
}
public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size, Time slide) {
return this.environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime ? this.windowAll(SlidingProcessingTimeWindows.of(size, slide)) : this.windowAll(SlidingEventTimeWindows.of(size, slide));
}
public AllWindowedStream<T, GlobalWindow> countWindowAll(long size) {
return this.windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}
public AllWindowedStream<T, GlobalWindow> countWindowAll(long size, long slide) {
return this.windowAll(GlobalWindows.create()).evictor(CountEvictor.of(size)).trigger(CountTrigger.of(slide));
}
2、基于KeyedStream的window
public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
return new WindowedStream(this, assigner);
}
也可以使用实现了WindowAssigner的timeWindow和countWindow
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
return this.environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime ? this.window(TumblingProcessingTimeWindows.of(size)) : this.window(TumblingEventTimeWindows.of(size));
}
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
return this.environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime ? this.window(SlidingProcessingTimeWindows.of(size, slide)) : this.window(SlidingEventTimeWindows.of(size, slide));
}
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
return this.window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
return this.window(GlobalWindows.create()).evictor(CountEvictor.of(size)).trigger(CountTrigger.of(slide));
}
3、WindowAssigner
在windowAll和window函数中都有一个入参是WindowAssigner,现在看下WindowAssigner机制
WindowAssigner负责将每条输入的数据分发到正确的window中,用来决定某个元素被分配到哪个/哪些窗口中去。
Flink提供了通用的WindowAssigner:滚动窗口,滑动窗口,会话窗口,全局窗口
public abstract class WindowAssigner<T, W extends Window> implements Serializable {
private static final long serialVersionUID = 1L;
public WindowAssigner() {
}
public abstract Collection<W> assignWindows(T var1, long var2, WindowAssigner.WindowAssignerContext var4);
public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment var1);
public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig var1);
public abstract boolean isEventTime();
public abstract static class WindowAssignerContext {
public WindowAssignerContext() {
}
public abstract long getCurrentProcessingTime();
}
}
Flink内部提供了丰富的实现
(1)Trigger:触发器。决定了一个窗口何时能够被计算或清除,每个窗口都会拥有一个自己的Trigger。
Flink内置的Trigger
public abstract class Trigger<T, W extends Window> implements Serializable {
private static final long serialVersionUID = -4104633972991191369L;
public Trigger() {
}
//当每个元素被添加窗口时调用。
public abstract TriggerResult onElement(T var1, long var2, W var4, Trigger.TriggerContext var5) throws Exception;
//当注册的处理时间计时器(processing-time timer)触发时调用。
public abstract TriggerResult onProcessingTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception;
//当注册的事件时间计时器(event-time timer)触发时调用。
public abstract TriggerResult onEventTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception;
public boolean canMerge() {
return false;
}
//与状态触发器相关,并且在相应的窗口合并时合并两个触发器的状态
public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {
throw new UnsupportedOperationException("This trigger does not support merging.");
}
//执行删除相应窗口所需的任何操作
public abstract void clear(W var1, Trigger.TriggerContext var2) throws Exception;
public interface OnMergeContext extends Trigger.TriggerContext {
<S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> var1);
}
public interface TriggerContext {
long getCurrentProcessingTime();
MetricGroup getMetricGroup();
long getCurrentWatermark();
void registerProcessingTimeTimer(long var1);
void registerEventTimeTimer(long var1);
void deleteProcessingTimeTimer(long var1);
void deleteEventTimeTimer(long var1);
<S extends State> S getPartitionedState(StateDescriptor<S, ?> var1);
/** @deprecated */
@Deprecated
<S extends Serializable> ValueState<S> getKeyValueState(String var1, Class<S> var2, S var3);
/** @deprecated */
@Deprecated
<S extends Serializable> ValueState<S> getKeyValueState(String var1, TypeInformation<S> var2, S var3);
}
}
public enum TriggerResult {
//什么都不做
CONTINUE(false, false),
//触发计算,然后清除窗口中的元素
FIRE_AND_PURGE(true, true),
//触发计算
FIRE(true, false),
//清除窗口中的元素
PURGE(false, true);
private final boolean fire;
private final boolean purge;
private TriggerResult(boolean fire, boolean purge) {
this.purge = purge;
this.fire = fire;
}
public boolean isFire() {
return this.fire;
}
public boolean isPurge() {
return this.purge;
}
}
触发器触发时,可以是FIRE或FIRE_AND_PURGE。当是FIRE时保留窗口的内容,当时FIRE_AND_PURGE时会删除其内容。默认情况下,内置的触发器只返回FIRE,不会清除窗口状态
清除只是简单地删除窗口的内容,并留下关于窗口和任何触发状态的任何潜在元信息。
(2)Evictor:可以译为“驱逐者”。在Trigger触发之后,在窗口被处理之前,Evictor(如果有Evictor的话)会用来剔除窗口中不需要的元素,相当于一个filter。
驱逐器能够在触发器触发之后,在应用窗口函数之前或之后从窗口中移除元素,也可以之前之后都删除元素( The evictor has the ability to remove elements from a window after the trigger fires and before and/or after the window function is applied.)。为此,Evictor接口有两种方法:
public interface Evictor<T, W extends Window> extends Serializable {
//在窗口函数之前应用
void evictBefore(Iterable<TimestampedValue<T>> var1, int var2, W var3, Evictor.EvictorContext var4);
//在窗口函数之后应用,在应用窗口函数之前被逐出的元素将不被处理
void evictAfter(Iterable<TimestampedValue<T>> var1, int var2, W var3, Evictor.EvictorContext var4);
public interface EvictorContext {
long getCurrentProcessingTime();
MetricGroup getMetricGroup();
long getCurrentWatermark();
}
}
Flink带有三个内置的驱逐器:
(1) CountEvictor:保持窗口内用户指定数量的元素,如果多于用户指定的数量,从窗口缓冲区的开头丢弃剩余的元素。
(2) DeltaEvictor:使用DeltaFunction和阈值,计算窗口缓冲区中的最后一个元素与其余每个元素之间的delta值,并删除delta值大于或等于阈值的元素(computes the delta between the last element in the window buffer and each of the remaining ones, and removes the ones with a delta greater or equal to the threshold)。
(3) TimeEvictor:以毫秒为单位的时间间隔作为参数,对于给定的窗口,找到元素中的最大的时间戳max_ts,并删除时间戳小于max_ts - interval的所有元素。
注意:
(1)默认情况下,所有内置的驱逐器在窗口函数之前应用
(2)Flink不保证窗口内元素的顺序。 这意味着虽然驱逐者可以从窗口的开头移除元素,但这些元素不一定是先到的还是后到的
4、窗口函数
定义好窗口的WindowAssigner后,需要定义如何对窗口里的数据做何种操作
Flink对此类操作划分为两类
(1)增量聚合函数(每条数据到来就计算)
AggregateFunction
ReduceFunction
(2)全窗口函数(先收集窗口的所有数据,等到窗口被触发计算时再遍历窗口内的所有数据)
ProcessWindowFunction