【发布时间】:2018-03-29 07:43:09
【问题描述】:
当窗口中有 5 个事件时,我使用自定义触发器触发。这适用于 TumblingEventTimeWindow 和 SlidingTimeWindow,因为它们都有固定的窗口开始和结束。但是该逻辑不适用于会话窗口,因为每个事件都会导致创建一个窗口并稍后合并。我正在使用减少状态来计算事件。
在必须计算事件的会话窗口的情况下如何处理这种情况?
代码要点: https://gist.github.com/thepythonista/4ad2f8c41f56aaea6ebf13fd9392c4bc
其他问题: 我能够在 OnMergeContext 的 mergePartitionState 方法中使用 ReducingStateDescriptor。事件被正确计算。但是当我尝试对 ValueStateDescriptor 使用 mergePartitionState 方法时,会引发编译时错误
ReducingStateDescriptor<Long> eventCounterDescriptor = new ReducingStateDescriptor<>("COUNT", new Sum(), LongSerializer.INSTANCE);
ValueStateDescriptor<Boolean> exitEventDescriptor = new ValueStateDescriptor<>("EXIT_EVENT", Boolean.class);
@Override
public void onMerge(TimeWindow window, OnMergeContext ctx) {
ctx.mergePartitionedState(reducingStateDescriptor); // can do this
ctx.mergePartitionedState(valueStateDescriptor); // won't compile
ctx.registerEventTimeTimer(window.maxTimestamp());
}
Error:(133, 16) java: method mergePartitionedState in interface org.apache.flink.streaming.api.windowing.triggers.Trigger.OnMergeContext cannot be applied to given types;
required: org.apache.flink.api.common.state.StateDescriptor<S,?>
found: org.apache.flink.api.common.state.ValueStateDescriptor<java.lang.Boolean>
reason: inference variable S has incompatible bounds
equality constraints: org.apache.flink.api.common.state.ValueState<java.lang.Boolean>
upper bounds: org.apache.flink.api.common.state.MergingState<?,?>
【问题讨论】:
标签: apache-flink flink-streaming