【问题标题】:how to implement timeWindow() in Apache Flink's StreamTableEnvironment?如何在 Apache Flink 的 StreamTableEnvironment 中实现 timeWindow()?
【发布时间】:2020-08-31 07:01:56
【问题描述】:

大家, 我想在 StreamTableEnvironment 中使用 flink 时间窗口。

我之前使用了 timeWindow(Time.seconds()) 函数和来自 kafka 主题的 dataStream。 对于外部问题,我将此 DataStream 转换为 DataTable 并使用 sqlQuery() 应用 SQL 查询。

我想用 SQL 做 x 次第二时间窗口聚合,然后将其发送到另一个 kafka 主题

数据来源:

    val stream = senv
      .addSource(new FlinkKafkaConsumer[String]("flink", new SimpleStringSchema(), properties))

以前的聚合示例:

    val windowCounts = stream.keyBy("x").timeWindow(Time.seconds(5), Time.seconds(5))

当前数据表:

    val tableA = tableEnv.fromDataStream(parsed, 'user, 'product, 'amount)

在这部分应该有一个查询,每次 X 次进行一次聚合

    val result = tableEnv.sqlQuery(
          s"SELECT * FROM $tableA WHERE amount > 2".stripMargin)

我的聚合或多或少将是 count(y) OVER(PARTITION BY x) 谢谢!

【问题讨论】:

    标签: apache-spark apache-kafka stream apache-flink flink-sql


    【解决方案1】:

    Ververica's training for Flink SQL 会帮你解决这个问题。在Querying Dynamic Tables with SQL 部分中包含一些仅涵盖此类查询的练习/示例。

    您必须为每个事件建立时间信息的来源,可以是处理时间或事件时间,之后与stream.keyBy("x").timeWindow(Time.seconds(5), Time.seconds(5))对应的查询将是这样的:

    SELECT
      x,
      TUMBLE_END(timestamp, INTERVAL '5' SECOND) AS t,
      COUNT(*) AS cnt
    FROM Events
    GROUP BY
      x, TUMBLE(timestamp, INTERVAL '5' SECOND);
    

    有关如何使用时间属性的详细信息,请参阅Introduction to Time Attributes

    有关使用 Flink SQL 窗口化的更详细文档,请参阅Group Windows 上的文档。

    【讨论】:

    • 嗨大卫,我如何使用 TUMBLE() 这个查询... SELECT user, product, c_num FROM (SELECT *, COUNT(user) OVER (PARTITION BY product ORDER BY proctime ASC) as c_num FROM 订单)
    • 我不清楚您要完成什么。您能否编辑您的问题以包含与您想使用 SQL 计算的数据流 api 等效的内容?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-02-26
    • 1970-01-01
    • 2020-08-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多