【问题标题】:How do I assign an id to a session window in Apache Flink?如何为 Apache Flink 中的会话窗口分配 id?
【发布时间】:2019-06-14 15:47:34
【问题描述】:

如何在 Apache Flink 中为会话窗口分配 id?

最终我想在会话窗口打开时使用会话窗口 id 一个接一个地丰富事件(我不想等到窗口关闭后再发出丰富的事件)。

我尝试使用 AggregateFunction 来执行此操作,但我认为 merge() 无法按预期工作。它似乎是用于合并窗口而不是窗格(触发触发)。我的管道中似乎从未调用过它。因此,触发器之间似乎没有共享状态!

会话窗口 id 将是第一个进入窗口的事件的时间戳(由于非保证排序,这可能意味着某些事件可能会进入具有更早时间戳的同一会话窗口 - 我没问题用这个)。

public class FooSessionState {

  private Long sessionCreationTime;

  private FooMatch lastMatch;
}

/**
 * Aggregator that assigns session ids to elements of a session window
 */
public class SessionIdAssigner implements
    AggregateFunction<FooMatch, FooSessionState, FooSessionEvent> {

  static final long serialVersionUID = 0L;

  @Override
  public FooSessionState createAccumulator() {
    return new FooSessionState();
  }

  @Override
  public FooSessionState add(FooMatch value, FooSessionState sessionState) {
    if (sessionState.getSessionCreationTime() == null) {
      sessionState.setSessionCreationTime(value.getReport().getTimestamp());
    }
    sessionState.setLastMatch(value);
    return sessionState;
  }

  @Override
  public FooSessionEvent getResult(FooSessionState accumulator) {
    FooSessionEvent sessionEvent = new FooSessionEvent();
    sessionEvent.setFooMatch(accumulator.getLastMatch());
    sessionEvent.setSessionCreationTime(accumulator.getSessionCreationTime());
    return sessionEvent;

  }

  @Override
  public FooSessionState merge(FooSessionState a, FooSessionState b) {

    if ( a.getSessionCreationTime() != null) {
      b.setSessionCreationTime(a.getSessionCreationTime());
    }

    return b;
  }
}

我的计划是按如下方式使用它:

stream.keyBy(new FooMatchKeySelector())
    .window(EventTimeSessionWindows.withGap(Time.milliseconds(config.getFooSessionWindowTimeout())))
    .trigger(PurgingTrigger.of(CountTrigger.of(1L)))
    .aggregate(new SessionIdAssigner())

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    我认为会话窗口不适合您想要实现的目标。它们被设计为聚合每个会话的事件,但不是为了丰富每个事件,即它们计算结果并在窗口关闭时发出它。正如您所注意到的,会话窗口通过为每个事件创建一个新窗口并合并重叠的窗口来工作。之所以选择这种设计,是因为事件可能会乱序到达。因此,您可能有两个窗口稍后通过桥接事件连接。

    我建议使用ProcessFunction 来实现逻辑,该ProcessFunction 收集事件并根据它们的时间戳对其进行排序。收到水印后,它会发出所有收集到的具有正确会话 ID 的事件。因此,您仅将两个水印之间的事件保持在状态。除了这些事件之外,您还需要保留上次发出的事件的时间戳和上次发出的会话 ID 以执行正确的会话。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-01-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-08-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多