【问题标题】:Correlated scalar variable must be aggregated for a scalar sub-query in spark必须为 spark 中的标量子查询聚合相关标量变量
【发布时间】:2019-08-23 13:19:30
【问题描述】:

我有一个Dataset<Row>,其中包含六列,如下所示:

 +---------------+---------------+----------------+-------+--------------+--------+
 |  time         | thingId       |     controller | module| variableName |  value |
 +---------------+---------------+----------------+-------+--------------+--------+
 |1554188264901  |  0002019000000|        0       | 0     |Voltage       |    5   |
 |1554188264901  |  0002019000000|        0       | 0     |SetPoint      |    7   |
 |1554188276412  |  0002019000000|        0       | 0     |Voltage       |    9   |
 |1554188276412  |  0002019000000|        0       | 0     |SetPoint      |    10  |  
 |1554188639406  |  0002019000000|        0       | 0     |SetPoint      |    6   |
 +---------------+---------------+----------------+-------+--------------+--------+

最终目标:

根据MAX(time) 获取最后更新的行,用于thingIdcontrollermodulevariableName 的组合。

因此,所需的输出应该在所有行中都包含 MAX(time),而其余的 variableName 值应该包含 last_updatedValue。

 +---------------+---------------+----------------+-------+--------------+--------+
 |  time         | thingId       |     controller | module| variableName |  value |
 +---------------+---------------+----------------+-------+--------------+--------+
 |1554188639406  |  0002019000000|        0       | 0     |SetPoint      |    6   |
 +---------------+---------------+----------------+-------+--------------+--------+

并且variableName 列有两个值('Voltage''SetPoint')用于这个特定的 thingId、控制器和模块,因此对于列 variableName 中的值 Voltage,它应该返回 最后更新的行VoltageMAX(time)

如下所示,预期输出:

 +---------------+---------------+----------------+-------+--------------+--------+
 |  time         | thingId       |     controller | module| variableName |  value |
 +---------------+---------------+----------------+-------+--------------+--------+
 |1554188276412  |  0002019000000|        0       | 0     |Voltage       |    9   |
 |1554188639406  |  0002019000000|        0       | 0     |SetPoint      |    6   |
 +---------------+---------------+----------------+-------+--------------+--------+

我尝试了什么:

我试过Scalar sub-query来得到这个,但是子查询中的列应该已经聚合了,我尝试了多种方法,但没有运气。

例如下面这段代码:

 Dataset<Row> inputds = spark.read().format("avro").load("hdfs://path");
 inputds.createOrReplaceTempView("abc");
 Dataset<Row> update = spark.sql("select MAX(p.time) max_time, p.thingId, p.controller, p.module, p.variableName, (SELECT d.value from abc d where d.thingId=p.thingId and d.controller=p.controller and d.module=p.module and d.variableName=p.variableName group by thingId,controller,module,variableName,value) as values from abc p")
 update.show();

引发错误:

必须为标量子查询聚合相关的标量变量

我该如何解决这个问题?如果有任何解决方法,请建议我。

谢谢!

【问题讨论】:

  • 您需要在查询结束时使用 GROUP BY 子句。
  • @jarlh,问题中的代码更新为 group by 并抛出 the output of correlated scalar sub-query must be aggregated
  • 暂时移除子查询。查询是否运行?如果没有,请添加 GROUP BY。
  • @jarlh,它返回五列作为上面的预期输出,除了列value
  • @Johwhite, Long

标签: sql apache-spark group-by apache-spark-sql


【解决方案1】:

问题似乎是您实际上需要聚合排序

您需要有与MAX(time) 直接相关的值,对于列的特定分组值 variableName,所以基本上是在同一行上的值。由于 SQL 中没有执行此操作的聚合函数,因此您可以对子查询结果进行排序。

所以要达到想要的 "last updated" 行,你按@对子查询进行排序987654326@,降序,然后将结果限制为 1 行。

可能是这样的:

Dataset<Row> update = spark.sql("SELECT
    MAX(p.time) max_time,
    p.thingId, p.controller, p.module, p.variableName,
    (SELECT d.value FROM abc d WHERE d.thingId=p.thingId AND d.controller=p.controller AND d.module=p.module AND d.variableName=p.variableName
        ORDER BY time DESC LIMIT 1) AS [lastUpdatedValue]
FROM abc p
GROUP BY thingId,controller,module,variableName")

附:我来自 SQL Server 背景,所以我通常会这样做TOP 1。我不完全确定 LIMIT 1 在 Apache Spark SQL 中具有相同的效果。

编辑:我找到了this,感谢这个答案here

基本上它是在讨论 spark 中的聚合函数,称为 first

也许在子查询中使用它可以解决问题?

    (SELECT first(d.value) FROM abc d WHERE d.thingId=p.thingId AND d.controller=p.controller AND d.module=p.module AND d.variableName=p.variableName
        ORDER BY time DESC LIMIT 1) AS [lastUpdatedValue]

【讨论】:

  • 我试过抛出同样的错误correlated scalar sub-query must be aggregated
  • Here 我找到了更多信息,也许对你有帮助。也检查我上次的编辑!
  • 同样的错误correlated scalar sub-query must be aggregated,谢谢你的链接,现在就试试吧!
【解决方案2】:

我最终在 spark 数据集中使用struct 解决了这个问题。

输入数据集

 +---------------+---------------+----------------+-------+--------------+--------+
 |  time         | thingId       |     controller | module| variableName |  value |
 +---------------+---------------+----------------+-------+--------------+--------+
 |1554188264901  |  0002019000000|        0       | 0     |Voltage       |    5   |
 |1554188264901  |  0002019000000|        0       | 0     |SetPoint      |    7   |
 |1554188276412  |  0002019000000|        0       | 0     |Voltage       |    9   |
 |1554188276412  |  0002019000000|        0       | 0     |SetPoint      |    10  |  
 |1554188639406  |  0002019000000|        0       | 0     |SetPoint      |    6   |
 +---------------+---------------+----------------+-------+--------------+--------+

 Dataset<Row> intermediate = inputDS.groupby("thingId","controller","module","variableName").agg(max(struct("time","value")).as("time_value_struct")).select("thingId","controller","module","variableName","time_value_struct.*");

 //above code gives me intermediate output
 +---------------+---------------+----------------+-------+--------------+--------+
 |  time         | thingId       |     controller | module| variableName |  value |
 +---------------+---------------+----------------+-------+--------------+--------+
 |1554188276412  |  0002019000000|        0       | 0     |Voltage       |    9   |
 |1554188639406  |  0002019000000|        0       | 0     |SetPoint      |    6   |
 +---------------+---------------+----------------+-------+--------------+--------+

所以现在我的任务是从 time 列中获取最大值,并为使用的 sql 的那个 thingId、控制器和模块填充它,如下所示

intermediate.createOrReplaceTempView("intermediate");

Dataset<Row> outputDS = spark.sql("select B.time,A.thingId,A.controller,A.module,A.variableName,A.value from intermediate A 
inner join (select thingId,controller,module,MAX(time)time from intermediate group by thingId,controller,module) B 
on A.thingId=B.thingId and A.controller=B.controller and A.module=B.module");

这给了我们预期输出

 +---------------+---------------+----------------+-------+--------------+--------+
 |  time         | thingId       |     controller | module| variableName |  value |
 +---------------+---------------+----------------+-------+--------------+--------+
 |1554188639406  |  0002019000000|        0       | 0     |Voltage       |    9   |
 |1554188639406  |  0002019000000|        0       | 0     |SetPoint      |    6   |
 +---------------+---------------+----------------+-------+--------------+--------+

所以我现在可以旋转以获取每个 thingId、控制器和模块的最后更新值

我知道中间步骤的sql 有内部联接,如果我能找出一些有效的sql 查询而不是内部联接,那就太好了。

感谢@johwhite 的帮助

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-10-15
    • 1970-01-01
    • 2020-01-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多