【发布时间】:2020-08-30 05:04:49
【问题描述】:
大家, 我有一个 kafka 主题源,我将它按 1 分钟窗口分组。 我想在那个窗口中做的是用 SQL 中的窗口函数创建新列,例如我想使用
- SUM(amount) OVER(PARTITION BY
- COUNT(user) OVER(PARTITION BY
- ROW_NUMBER() OVER(PARTITION BY
我可以对这些操作使用 DataStream 函数吗? 或
如何操作我的 kafka 数据将其转换为 DataTable 并使用 sqlQuery?
Destination 是另一个 kafka 主题。
val stream = senv
.addSource(new FlinkKafkaConsumer[String]("flink", new SimpleStringSchema(), properties))
我已经尝试过这样做
val tableA = tableEnv.fromDataStream(stream, 'user, 'product, 'amount)
但我收到以下错误
Exception in thread "main" org.apache.flink.table.api.ValidationException: Too many fields referenced from an atomic type.
测试数据
1,"beer",3
1,"beer",1
2,"beer",3
3,"diaper",4
4,"diaper",1
5,"diaper",5
6,"rubber",2
查询示例
SELECT
user, product, amount,
COUNT(user) OVER(PARTITION BY product) AS count_product
FROM table;
预期表现
1,"beer",3,3
1,"beer",1,3
2,"beer",3,3
3,"diaper",4,3
4,"diaper",1,3
5,"diaper",5,3
6,"rubber",2,1
【问题讨论】:
-
请发布一份来自 Kafka 的数据样本。
-
同时贴出你要实现的SQL查询。
-
您的流是字符串类型,您正在尝试从中访问字段。这解释了你的例外。
Too many fields referenced from an atomic type -
你好,查询是:SELECT user, product, amount, COUNT(user) OVER(PARTITION BY product) AS count_product FROM table;
-
如何应用该方案以便将其转换为表格?
标签: scala apache-kafka streaming apache-flink