【发布时间】:2020-08-31 07:01:56
【问题描述】:
大家, 我想在 StreamTableEnvironment 中使用 flink 时间窗口。
我之前使用了 timeWindow(Time.seconds()) 函数和来自 kafka 主题的 dataStream。 对于外部问题,我将此 DataStream 转换为 DataTable 并使用 sqlQuery() 应用 SQL 查询。
我想用 SQL 做 x 次第二时间窗口聚合,然后将其发送到另一个 kafka 主题
数据来源:
val stream = senv
.addSource(new FlinkKafkaConsumer[String]("flink", new SimpleStringSchema(), properties))
以前的聚合示例:
val windowCounts = stream.keyBy("x").timeWindow(Time.seconds(5), Time.seconds(5))
当前数据表:
val tableA = tableEnv.fromDataStream(parsed, 'user, 'product, 'amount)
在这部分应该有一个查询,每次 X 次进行一次聚合
val result = tableEnv.sqlQuery(
s"SELECT * FROM $tableA WHERE amount > 2".stripMargin)
我的聚合或多或少将是 count(y) OVER(PARTITION BY x) 谢谢!
【问题讨论】:
标签: apache-spark apache-kafka stream apache-flink flink-sql