【问题标题】:Watermark with TumblingWindow in Apache FlinkApache Flink 中带有 TumblingWindow 的水印
【发布时间】:2021-11-12 19:34:07
【问题描述】:

我试图了解 Apache FLink 中 Windows 和 Watermark 生成之间的依赖关系,下面的示例出现错误:

   public static void main(String[] args) throws Exception {

         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.getConfig().setAutoWatermarkInterval(10000);

        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("watermarkFlink", new SimpleStringSchema(), props);
        DataStream<String> orderStream = env.addSource(kafkaSource);
        DataStream<Order> dataStream = orderStream.map(str -> {
                    String[] order = str.split(",");
                    return new Order(order[0], Long.parseLong(order[1]), null);
                });

        WatermarkStrategy<Order> orderWatermarkStrategy = CustomWatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                .withTimestampAssigner((element, timestamp) ->
                        element.getTimestamp()
                );
        dataStream
            .assignTimestampsAndWatermarks(orderWatermarkStrategy)
            .map(new OrderKeyValue())
            .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                @Override
                public String getKey(Tuple2<String, Integer> src) throws Exception {
                    return src.f0;
                }
            })
            .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(5)))
            .sum(1)
            .print("Windows");

            dataStream.print("Data");

            env.execute();
        }


    public static class OrderKeyValue implements MapFunction<Order, Tuple2<String, Integer>> {
        @Override
        public Tuple2<String, Integer> map(Order order) {
            return new Tuple2<>(order.getCategory(), 1);
        }
    }

这里的时间戳是一个很长的时间戳,我们可以从 Kafka 源中检索到它应该是:A,4 C,8 其中 C 是类别,5 是时间戳。

每当我发送一个事件时,数据流正在打印,但不是这些与窗口(打印(“Windows”))。 另外,例如,如果我收到一个事件 A,12,然后我生成了一个水印(在 10 秒内),那么我有 C,2 在第一个窗口关闭之后出现,它会在窗口中处理还是只是忽略?

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    Flink 文档中有一个教程可以帮助阐明这些概念:https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/streaming_analytics/

    但是总结一下情况:

    • 如果您有像 (A,4) (C,8) (A,12) 这样的事件流,那么这些整数将被解释为毫秒。

    • 您的第一个窗口将等待水印为 20000 才被触发。

    • 要生成这么大的水印,您需要一个时间戳至少为 21000 的事件(因为有界无序设置为 1 秒)。

    • 由于您已将自动水印间隔配置为 10 秒,因此您的应用程序必须运行很长时间才能生成第一个水印。 (我想不出任何设置这么大的水印间隔有帮助的情况。)

    • 如果一个事件在其窗口关闭后到达,那么它将被忽略(默认情况下)。您可以配置允许的延迟以安排延迟事件来触发额外的窗口触发。

    【讨论】:

    • 感谢@David,这真的很有帮助,而且有效。在此示例中添加 Checkopointing 实现使其无法正常工作?你知道为什么吗 ?你有任何检查点的例子吗?谢谢
    • 添加检查点不应导致它停止工作。请提出一个新问题,提供足够的详细信息以了解代码和配置是如何更改的,以及它现在是如何失败的。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多