【问题标题】:How to use Flink streaming timeWindow with timestamp and watermark assigners?如何将 Flink 流式 timeWindow 与时间戳和水印分配器一起使用?
【发布时间】:2017-02-01 08:46:06
【问题描述】:

我正在开发一个从 Kafka 读取事件的 Flink 流处理器。这些事件由其中一个字段键入,并且应该在一段时间内窗口化,然后再减少和输出。我的处理器使用事件时间作为时间特征,因此从它消耗的事件中读取时间戳。这是它目前的样子:

source
    .map(new MapEvent())
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
        @Override public long extractTimestamp(Event event) {
                return event.getTimestamp();
            }
        })
    .keyBy(new KeySelector())
    .timeWindow(Time.minutes(1))
    .reduce(new EventReducer())
    .map(new MapToResult());

我对这些事件的了解如下:

  1. 它们的事件时间是无序的。
  2. 延迟到达是可能的,因此事件到达的时间可能比时间戳所说的要晚得多。为了便于使用,假设我知道,最晚可能的到达时间为 20 秒。
  3. 我希望我的活动在 Flink 将它们转发到下面的 reduce 运算符之前恰好窗口化一分钟。

最后,这是我的问题:

  1. 鉴于我之前描述的用例,BoundedOutOfOrdernessTimestampExtractor 是一个不错的选择吗?我已经阅读了文档,看到了 AssignerWithPunctuatedWatermarks 和其他可用于创建水印的预定义分配器,但不完全理解,如果这些对我来说会更好。
  2. assignTimestampsAndWatermarks() 如何与timeWindow() 方法一起使用?当涉及迟到时,他们可以干涉吗?在该区域我有什么需要牢记的吗?

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    我认为我们应该从水印概念开始。简而言之,水印表示大多数具有较早时间戳的事件已经到达。基于该假设,当水印通过窗口末尾时,timeWindow 可以发出窗口。当然,仍然可能会出现迟到的情况,这是人们可能想要处理的。这里出现了allowedLateness 的概念,它指定了在发出窗口后多久我们应该跟踪那里的元素,以便我们可以例如用那些迟到的事件更新我们的接收器(但必须记住,没有这个窗口已经发出元素)。希望这能以某种方式回答您的第二个问题。

    回到您的第一个问题,如果您有许多可能会延迟 20 秒的事件,我认为BoundedOutOfOrdernessTimestampExtractor 是最佳选择。这种方式虽然发射每个窗口都会延迟 20 秒。如果迟到是相当零星的,并且您可以处理重复,那么您可能会考虑另一个。

    您提到的AssignerWithPunctuatedWatermarks,正如文档所说,应该在您的流中的某些特定事件已经充当水印的情况下使用。所以不要认为它适合你的用例。

    有关水印的更多信息,您可以阅读 docthisthat

    【讨论】:

    • 谢谢,这有助于理解这两个东西是如何协同工作的。
    • 好的,我的实现取得了一些进展,还编写了一些测试来检查延迟和窗口提取。这些测试表现得很奇怪,因为它们产生不可重现的结果。也许我仍然缺少一些东西。这是我在测试中所做的。我首先创建了一堆实体,它们的事件时间恰好适合一个 timeWindow(1 分钟)。然后我创建另一个实体,即迟到(在窗口外和允许的迟到)。之后,我再次从第一步创建完全相同的事件(相同的事件时间)并将它们发送到 flink。结果因运行而异。
    • 至于结果的正确性,我需要看一些例子。但是出于随机性,我怀疑您没有在StreamExecutionEnvironment 上启用TimeCharacteristic.EventTime,是这样吗?
    • 我将它设置为TimeCharacteristic.EventTime,问题是ExecutionConfig.setAutoWatermarkInterval(...) 的标准设置。根据 Flink 调用 getCurrentWatermark 的时间跨度,窗口关闭的方式不同,从而导致不同的结果。
    【解决方案2】:

    可能你的watermark总是小于window-endtime,这样就不会触发window产生结果。window触发方法的要点如下:

    1. 水印>=窗口结束时间。
    2. 此窗口中有一些元素。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-01-20
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多