【发布时间】:2019-04-04 14:20:30
【问题描述】:
我有一个 spark 数据集 inputDS Dataset<Row> 如下所示
+---------------+---------------+----------------+-------+--------------+--------+
| time | thingId | controller | module| variableName | value |
+---------------+---------------+----------------+-------+--------------+--------+
|1554188276412 | 0002019000000| 0 | 0 |Voltage | 9 |
|1554188639406 | 0002019000000| 0 | 0 |SetPoint | 6 |
+---------------+---------------+----------------+-------+--------------+--------+
由第 1 步
生成Dataset<Row> inputDS = readInput.groupby("thingId","controller","module","variableName").agg(max(struct("time","value")).as("time_value_struct")).select("thingId","controller","module","variableName","time_value_struct.*");
预期输出
+---------------+---------------+----------------+-------+--------------+--------+
| time | thingId | controller | module| variableName | value |
+---------------+---------------+----------------+-------+--------------+--------+
|1554188639406 | 0002019000000| 0 | 0 |Voltage | 9 |
|1554188639406 | 0002019000000| 0 | 0 |SetPoint | 6 |
+---------------+---------------+----------------+-------+--------------+--------+
Max(time)thingId,controller,module and variableName 的列
最终目标是根据 MAX(time) 列获取每个 thingId、控制器、模块和变量名称的最后更新值。
代码
inputDS.createOrReplaceTempView("intermediate");
Dataset<Row> outputDS = spark.sql("select B.time,A.thingId,A.controller,A.module,A.variableName,A.value from intermediate A
inner join (select thingId,controller,module,MAX(time)time from intermediate group by thingId,controller,module) B
on A.thingId=B.thingId and A.controller=B.controller and A.module=B.module");
SQL 查询按预期工作,但使用 inner join 看起来效率不高
1) 是否有任何其他有效的方法可以在没有内部连接或等效 where 条件的情况下获得预期输出。
2) 如果我们能够从 STEP 1
获得预期的输出,那就太好了 Dataset<Row> intermediate = inputDS.groupby("thingId","controller","module","variableName").agg(max(struct("time","value")).as("time_value_struct")).select("thingId","controller","module","variableName","time_value_struct.*");
【问题讨论】:
-
没有使用分析函数,这甚至可能是Spark SQL,您当前的连接查询是完全有效的。如果性能是一个问题,那么请研究调整连接查询的方法。
-
@TimBiegeleisen,spark 确实允许解析函数,你可以看看这个链接:databricks.com/blog/2015/07/15/…
标签: sql apache-spark apache-spark-sql