【发布时间】:2019-08-23 13:19:30
【问题描述】:
我有一个Dataset<Row>,其中包含六列,如下所示:
+---------------+---------------+----------------+-------+--------------+--------+
| time | thingId | controller | module| variableName | value |
+---------------+---------------+----------------+-------+--------------+--------+
|1554188264901 | 0002019000000| 0 | 0 |Voltage | 5 |
|1554188264901 | 0002019000000| 0 | 0 |SetPoint | 7 |
|1554188276412 | 0002019000000| 0 | 0 |Voltage | 9 |
|1554188276412 | 0002019000000| 0 | 0 |SetPoint | 10 |
|1554188639406 | 0002019000000| 0 | 0 |SetPoint | 6 |
+---------------+---------------+----------------+-------+--------------+--------+
最终目标:
根据MAX(time) 获取最后更新的行,用于thingId、controller、module 和variableName 的组合。
因此,所需的输出应该在所有行中都包含 MAX(time),而其余的 variableName 值应该包含 last_updatedValue。
+---------------+---------------+----------------+-------+--------------+--------+
| time | thingId | controller | module| variableName | value |
+---------------+---------------+----------------+-------+--------------+--------+
|1554188639406 | 0002019000000| 0 | 0 |SetPoint | 6 |
+---------------+---------------+----------------+-------+--------------+--------+
并且variableName 列有两个值('Voltage' 和 'SetPoint')用于这个特定的 thingId、控制器和模块,因此对于列 variableName 中的值 Voltage,它应该返回 最后更新的行 值 Voltage 和 MAX(time)。
如下所示,预期输出:
+---------------+---------------+----------------+-------+--------------+--------+
| time | thingId | controller | module| variableName | value |
+---------------+---------------+----------------+-------+--------------+--------+
|1554188276412 | 0002019000000| 0 | 0 |Voltage | 9 |
|1554188639406 | 0002019000000| 0 | 0 |SetPoint | 6 |
+---------------+---------------+----------------+-------+--------------+--------+
我尝试了什么:
我试过Scalar sub-query来得到这个,但是子查询中的列应该已经聚合了,我尝试了多种方法,但没有运气。
例如下面这段代码:
Dataset<Row> inputds = spark.read().format("avro").load("hdfs://path");
inputds.createOrReplaceTempView("abc");
Dataset<Row> update = spark.sql("select MAX(p.time) max_time, p.thingId, p.controller, p.module, p.variableName, (SELECT d.value from abc d where d.thingId=p.thingId and d.controller=p.controller and d.module=p.module and d.variableName=p.variableName group by thingId,controller,module,variableName,value) as values from abc p")
update.show();
引发错误:
必须为标量子查询聚合相关的标量变量
我该如何解决这个问题?如果有任何解决方法,请建议我。
谢谢!
【问题讨论】:
-
您需要在查询结束时使用 GROUP BY 子句。
-
@jarlh,问题中的代码更新为
group by并抛出the output of correlated scalar sub-query must be aggregated -
暂时移除子查询。查询是否运行?如果没有,请添加 GROUP BY。
-
@jarlh,它返回五列作为上面的预期输出,除了列
value -
@Johwhite, Long
标签: sql apache-spark group-by apache-spark-sql