问题需求:
我们在日常的需求中,通常会汇总计算,比如一天的数据总数 ,然后每分钟要输出一次结果
1)window+trigger实现,就是下面的代码,缺点是没办法区分哪个输出结果是汇总结果,可以参考下面的代码,把窗口的end时间做一个state存储,然后判断大小,再输出最后的汇总值
DataStream<String> aggregateStream = nextStream
// .windowAll(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
.process(new ProcessAllWindowFunction<NextEvent, String, TimeWindow>() {
private transient ValueState<Long> valueState2;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ValueStateDescriptor<Long> valueStateDescriptor =
new ValueStateDescriptor<>("lastUserLogin", TypeInformation.of(new TypeHint<Long>() {
}));
valueState2 = getRuntimeContext().getState(valueStateDescriptor);
}
@Override
public void process(Context context, Iterable<NextEvent> iterable, Collector<String> out) throws Exception {
Iterator<NextEvent> iterator = iterable.iterator();
List<NextEvent> myList = Lists.newArrayList(iterable);
System.out.println(myList);
Long value = valueState2.value();
long end = context.window().getEnd();
if (value ==null){
valueState2.update(end);
}else {
long max = Math.max(value, end);
valueState2.update(max);
}
Long value22 = valueState2.value();
System.out.println(" 打印state:"+value22);
long start = context.window().getStart();
out.collect("start:"+start+"-----end:"+end+" 打印state:"+value22);
}
});
2)正常的stream 后面并行接2个window窗口,阿里大佬说这样子的写法不影响性能,具体也得自己去验证了。