我想通过 scala dataframe api 做这种查询。
tl;dr 在 Spark 2.1.0 之前这是不可能的。目前没有计划将此类运算符添加到 Dataset API。
Spark SQL 支持以下所谓的多维聚合运算符:
-
rollup运营商
-
cube运营商
-
GROUPING SETS 子句(仅在 SQL 模式下)
-
grouping() 和 grouping_id() 函数
注意: GROUPING SETS 仅在 SQL 模式下可用。 Dataset API 不支持。
分组集
val sales = Seq(
("Warsaw", 2016, 100),
("Warsaw", 2017, 200),
("Boston", 2015, 50),
("Boston", 2016, 150),
("Toronto", 2017, 50)
).toDF("city", "year", "amount")
sales.createOrReplaceTempView("sales")
// equivalent to rollup("city", "year")
val q = sql("""
SELECT city, year, sum(amount) as amount
FROM sales
GROUP BY city, year
GROUPING SETS ((city, year), (city), ())
ORDER BY city DESC NULLS LAST, year ASC NULLS LAST
""")
scala> q.show
+-------+----+------+
| city|year|amount|
+-------+----+------+
| Warsaw|2016| 100|
| Warsaw|2017| 200|
| Warsaw|null| 300|
|Toronto|2017| 50|
|Toronto|null| 50|
| Boston|2015| 50|
| Boston|2016| 150|
| Boston|null| 200|
| null|null| 550| <-- grand total across all cities and years
+-------+----+------+
// equivalent to cube("city", "year")
// note the additional (year) grouping set
val q = sql("""
SELECT city, year, sum(amount) as amount
FROM sales
GROUP BY city, year
GROUPING SETS ((city, year), (city), (year), ())
ORDER BY city DESC NULLS LAST, year ASC NULLS LAST
""")
scala> q.show
+-------+----+------+
| city|year|amount|
+-------+----+------+
| Warsaw|2016| 100|
| Warsaw|2017| 200|
| Warsaw|null| 300|
|Toronto|2017| 50|
|Toronto|null| 50|
| Boston|2015| 50|
| Boston|2016| 150|
| Boston|null| 200|
| null|2015| 50| <-- total across all cities in 2015
| null|2016| 250| <-- total across all cities in 2016
| null|2017| 250| <-- total across all cities in 2017
| null|null| 550|
+-------+----+------+
如果结果表的列中的值为null,则不一定意味着该列已在该行上聚合。如果该列在原始表中有nulls,则聚合表中的null 值可能仅代表原始表中的null 值。使用grouping 函数检查列是否聚合在特定行上。