【发布时间】:2019-06-14 15:47:34
【问题描述】:
如何在 Apache Flink 中为会话窗口分配 id?
最终我想在会话窗口打开时使用会话窗口 id 一个接一个地丰富事件(我不想等到窗口关闭后再发出丰富的事件)。
我尝试使用 AggregateFunction 来执行此操作,但我认为 merge() 无法按预期工作。它似乎是用于合并窗口而不是窗格(触发触发)。我的管道中似乎从未调用过它。因此,触发器之间似乎没有共享状态!
会话窗口 id 将是第一个进入窗口的事件的时间戳(由于非保证排序,这可能意味着某些事件可能会进入具有更早时间戳的同一会话窗口 - 我没问题用这个)。
public class FooSessionState {
private Long sessionCreationTime;
private FooMatch lastMatch;
}
/**
* Aggregator that assigns session ids to elements of a session window
*/
public class SessionIdAssigner implements
AggregateFunction<FooMatch, FooSessionState, FooSessionEvent> {
static final long serialVersionUID = 0L;
@Override
public FooSessionState createAccumulator() {
return new FooSessionState();
}
@Override
public FooSessionState add(FooMatch value, FooSessionState sessionState) {
if (sessionState.getSessionCreationTime() == null) {
sessionState.setSessionCreationTime(value.getReport().getTimestamp());
}
sessionState.setLastMatch(value);
return sessionState;
}
@Override
public FooSessionEvent getResult(FooSessionState accumulator) {
FooSessionEvent sessionEvent = new FooSessionEvent();
sessionEvent.setFooMatch(accumulator.getLastMatch());
sessionEvent.setSessionCreationTime(accumulator.getSessionCreationTime());
return sessionEvent;
}
@Override
public FooSessionState merge(FooSessionState a, FooSessionState b) {
if ( a.getSessionCreationTime() != null) {
b.setSessionCreationTime(a.getSessionCreationTime());
}
return b;
}
}
我的计划是按如下方式使用它:
stream.keyBy(new FooMatchKeySelector())
.window(EventTimeSessionWindows.withGap(Time.milliseconds(config.getFooSessionWindowTimeout())))
.trigger(PurgingTrigger.of(CountTrigger.of(1L)))
.aggregate(new SessionIdAssigner())
【问题讨论】:
标签: apache-flink flink-streaming