【问题标题】:Apache Flink - calculate the difference of value between two consecutive event with event timeApache Flink - 计算两个连续事件与事件时间之间的差异
【发布时间】:2020-09-12 16:40:35
【问题描述】:

我有一些电能表会不断产生计数器值,这是一个累积度量。即不断增加直到计数器重置。

Key                Value
----------------------------------------------------------------------
Sensor1            {timestamp: "10-10-2019 10:20:30", Kwh: 10}
Sensor1            {timestamp: "10-10-2019 10:20:40", Kwh: 21}
Sensor1            {timestamp: "10-10-2019 10:20:55", Kwh: 25}
Sensor1            {timestamp: "10-10-2019 10:21:05", Kwh: 37}
Sensor1            {timestamp: "10-10-2019 10:21:08", Kwh: 43}
.
.
.

有一个实时 ETL 作业可以在事件时间的两个连续值之间进行减法。

例如

10-10-2019 10:20:30  = 21 - 10 = 11
10-10-2019 10:20:40  = 25 - 21 = 4
10-10-2019 10:20:55  = 37 - 25 = 12
. 
.
.

此外,有时可能无法按顺序接收事件。

如何使用 Apache Flink Streaming API 实现? Java 中的例子更好。

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    一般来说,当需要按顺序处理乱序流时,最简单(也是高效)的处理方法是使用 Flink SQL,并依靠它来进行排序。请注意,它将依赖WatermarkStrategy 来确定何时可以安全地认为事件已准备好发出,并将丢弃任何迟到的事件。如果您必须了解迟到的事件,那么我建议您使用CEP 而不是带有 MATCH_RECOGNIZE 的 SQL(如下所示)。

    有关使用水印进行排序的更多信息,请参阅the tutorial about Watermarks in the Flink docs

    以下是如何使用 Flink SQL 实现用例的示例:

    public class SortAndDiff {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            DataStream<Tuple3<String, Long, Long>> input = env.fromElements(
                    new Tuple3<>("sensor1", "2019-10-10 10:20:30", 10L),
                    new Tuple3<>("sensor1", "2019-10-10 10:20:40", 21L),
                    new Tuple3<>("sensor2", "2019-10-10 10:20:10", 28L),
                    new Tuple3<>("sensor2", "2019-10-10 10:20:05", 20L),
                    new Tuple3<>("sensor1", "2019-10-10 10:20:55", 25L),
                    new Tuple3<>("sensor1", "2019-10-10 10:21:05", 37L),
                    new Tuple3<>("sensor2", "2019-10-10 10:23:00", 30L))
            .map(new MapFunction<Tuple3<String, String, Long>, Tuple3<String, Long, Long>>() {
                @Override
                public Tuple3<String, Long, Long> map(Tuple3<String, String, Long> t) throws Exception {
                    return new Tuple3<>(t.f0, Timestamp.valueOf(t.f1).toInstant().toEpochMilli(), t.f2);
                }
            }).assignTimestampsAndWatermarks(
                    WatermarkStrategy
                        .<Tuple3<String, Long, Long>>forBoundedOutOfOrderness(Duration.ofMinutes(1))
                        .withTimestampAssigner((event, timestamp) -> event.f1));
    
            Table events = tableEnv.fromDataStream(input,
                    $("sensorId"),
                    $("ts").rowtime(),
                    $("kwh"));
    
            Table results = tableEnv.sqlQuery(
                    "SELECT E.* " +
                        "FROM " + events + " " +
                        "MATCH_RECOGNIZE ( " +
                            "PARTITION BY sensorId " +
                            "ORDER BY ts " +
                            "MEASURES " +
                                "this_step.ts AS ts, " +
                                "next_step.kwh - this_step.kwh AS diff " +
                            "AFTER MATCH SKIP TO NEXT ROW " +
                            "PATTERN (this_step next_step) " +
                            "DEFINE " +
                                "this_step AS TRUE, " +
                                "next_step AS TRUE " +
                        ") AS E"
            );
    
    
            tableEnv
                    .toAppendStream(results, Row.class)
                    .print();
    
            env.execute();
        }
    
    }
    

    【讨论】:

    • 未来是否有任何计划来捕获延迟事件而不是丢弃它并将这些延迟事件发送到 Flink SQL 中的单独接收器(或任何可能的提取方式)?
    • 这还没有在路线图上。见issues.apache.org/jira/browse/FLINK-10031
    • 如果您使用 DataStream API 进行数据摄取和加水印,那么在使用 Table/SQL api 进行进一步处理之前,可以很容易地使用带有侧输出的 ProcessFunction 来捕获任何延迟事件。
    • 感谢分享这个大卫,我知道 DataStream API 中的侧面输出。但有兴趣了解 Flink SQL 中的侧输出延迟事件。由于语法简单,我的一些工作是由 BI 工程师使用 Flink SQL 创建的,因此有兴趣了解这一点。 JIRA 链接正是我想要的。现在,我将自己添加为工单的听众。
    • 感谢大卫,您的解决方案很有魅力。但是,当我使用 socketTextStream 模拟传感器事件时,总是不会考虑最近的两个事件。这会导致处理延迟。我怀疑它可能与水印问题有关,但我尝试了 withIdleness 和 setAutoWatermarkInterval 配置仍然没有运气。你有什么建议吗?
    猜你喜欢
    • 2017-02-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-11-02
    • 1970-01-01
    • 2017-07-03
    相关资源
    最近更新 更多