【问题标题】:Flink temporal join works only for a few secondsFlink temporal join 只工作几秒钟
【发布时间】:2021-11-22 09:22:31
【问题描述】:

我正在尝试在 Flink 中实现事件时间临时连接。这是第一个连接表:

   tEnv.executeSql("CREATE TABLE AggregatedTrafficData_Kafka (" +
            "`timestamp` TIMESTAMP_LTZ(3)," +
            "`area` STRING," +
            "`networkEdge` STRING," +
            "`vehiclesNumber` BIGINT," +
            "`averageSpeed` INTEGER," +
            "WATERMARK FOR `timestamp` AS `timestamp`" +
            ") WITH (" +
            "'connector' = 'kafka'," +
            "'topic' = 'seneca.trafficdata.aggregated'," +
            "'properties.bootstrap.servers' = 'localhost:9092'," +
            "'properties.group.id' = 'traffic-data-aggregation-job'," +
            "'format' = 'json'," +
            "'json.timestamp-format.standard' = 'ISO-8601'" +
            ")");

该表用作以下查询的接收器:

   Table aggregatedTrafficData = trafficData
            .window(Slide.over(lit(30).seconds())
                    .every(lit(15).seconds())
                    .on($("timestamp"))
                    .as("w"))
            .groupBy($("w"), $("networkEdge"), $("area"))
            .select(
                    $("w").end().as("timestamp"),
                    $("area"),
                    $("networkEdge"),
                    $("plate").count().as("vehiclesNumber"),
                    $("speed").avg().as("averageSpeed")
            );

这是另一个连接表。我使用 Debezium 将 Postgres 表流式传输到 Kafka:

    tEnv.executeSql("CREATE TABLE TransportNetworkEdge_Kafka (" +
            "`timestamp` TIMESTAMP_LTZ(3) METADATA FROM 'value.source.timestamp' VIRTUAL," +
            "`urn` STRING," +
            "`flow_rate` INTEGER," +
            "PRIMARY KEY(`urn`) NOT ENFORCED," +
            "WATERMARK FOR `timestamp` AS `timestamp`" +
            ") WITH (" +
            "'connector' = 'kafka'," +
            "'topic' = 'seneca.network.transport_network_edge'," +
            "'scan.startup.mode' = 'latest-offset'," +
            "'properties.bootstrap.servers' = 'localhost:9092'," +
            "'properties.group.id' = 'traffic-data-aggregation-job'," +
            "'format' = 'debezium-json'," +
            "'debezium-json.schema-include' = 'true'" +
            ")");

最后是时间连接:

    Table transportNetworkCongestion = tEnv.sqlQuery("SELECT AggregatedTrafficData_Kafka.`timestamp`, `networkEdge`, " +
            "congestion(`vehiclesNumber`, `flow_rate`) AS `congestion` FROM AggregatedTrafficData_Kafka " +
            "JOIN TransportNetworkEdge_Kafka FOR SYSTEM_TIME AS OF AggregatedTrafficData_Kafka.`timestamp` " +
            "ON AggregatedTrafficData_Kafka.`networkEdge` = TransportNetworkEdge_Kafka.`urn`");

我遇到的问题是连接仅在最初的几秒钟内有效(在 Postgres 表中更新之后),但我需要继续使用 debezium 连接第一个表。难道我做错了什么? 谢谢 尤克斯

【问题讨论】:

    标签: join apache-flink debezium flink-sql


    【解决方案1】:

    使用您正在使用的 AS OF 语法的临时连接需要:

    • 具有有效事件时间属性的仅追加表
    • 具有主键和有效事件时间属性的更新表
    • 主键上的相等谓词

    当 Flink SQL 的时间运算符应用于事件时间流时,水印在确定何时产生结果以及何时清除状态方面起着至关重要的作用。

    执行时间连接时:

    • 仅附加表中的行在 Flink 状态下被缓冲,直到连接运算符的当前水印达到它们的时间戳
    • 对于版本化表,对于每个键,其时间戳在连接运算符的当前水印之前的最新版本以及当前水印之后的任何版本都保持在状态中
    • 只要连接运算符的水印前进,就会产生新结果,并清除不再相关的状态

    join 操作符跟踪它从其输入通道接收到的水印,其当前水印始终是这两个水印中的最小值。这就是为什么您的加入会停止,并且只有在 flow_rate 更新时才会取得进展。

    解决此问题的一种方法是为 TransportNetworkEdge_Kafka 表设置水印,如下所示:

    "WATERMARK FOR `timestamp` AS " + Watermark.MAX_WATERMARK
    

    这会将这个表/流的水印设置为可能的最大值,这将使来自这个流的水印变得无关紧要——这个流的水印永远不会是最小的。

    但是,这样做的缺点是连接结果不确定。

    【讨论】:

    • 滑动窗口查询“提供”在时间连接中使用的 AggregatedTrafficData_Kafka 表。流已经空闲是什么意思? AggregatedTrafficData_Kafka 每 15 秒发出一次数据,但 TransportNetworkEdge_Kafka 仅在“flow_rate”参数有更新时才发出数据(这种情况很少发生变化)。
    • 实际上,AggregatedTrafficData_Kafka 每 15 秒发出一次数据。临时连接工作了几秒钟然后停止。如果我更新 Postgres 中的 flow_rate 参数(因此 TransportNetworkEdge_Kafka 更新),则连接恢复为先前发出的 AggregatedTrafficData_Kafka 事件工作,并在几秒钟后再次停止。在实践中,我必须定期更新 Postgres 表以使其正常工作,但我希望即使没有表更新也能正常工作。
    • 现在我明白发生了什么,并且我已经更新了我的答案。
    • 谢谢。我尝试使用最大的水印值,现在它可以按预期连续工作。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-08-03
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多