【发布时间】:2021-07-05 21:54:50
【问题描述】:
我已按如下一列(排名)对数据进行分区和排序
+-------+---------+----+
|classId|studentId|rank|
+-------+---------+----+
|1 |123 |1 |
|1 |5000 |2 |
|1 |5000 |3 |
|1 |5000 |4 |
|1 |908 |5 |
|1 |908 |6 |
|2 |123 |1 |
|2 |123 |2 |
|2 |123 |3 |
|2 |908 |4 |
+-------+---------+----+
现在我想要以下输出,按排名列顺序排列的 StudentIds 数组。
+-------+----------------------------------+
|classId|studentIds |
+-------+----------------------------------+
|1 |[1234, 5000, 908] |
|2 |[1234, 908] |
+-------+----------------------------------+
我尝试在分区上执行 collect_list ,但这让我以正确的顺序重复
+-------+---------------------------------+
|classId|studentIds |
+-------+---------------------------------+
|1 |[123, 5000, 5000, 5000, 908, 908]|
|2 |[123, 123, 123, 908] |
+-------+---------------------------------+
我在分区上尝试了 collect_set,它给了我不同的值但学生 ID 的顺序不正确
+-------+----------------+
|classId|studentIds |
+-------+----------------+
|1 |[5000, 123, 908]|
|2 |[123, 908] |
+-------+----------------+
代码:
//Sample Data
val simpleData = Seq(("2", "123", 1),("2", "908", 4),
("1", "123", 1), ("1", "5000", 3), ("1", "908", 5), ("1", "5000", 2),
("1", "5000", 4), ("1", "908",6), ("2", "123", 2), ("2", "123", 3)
)
val df = simpleData.toDF("classId", "studentId", "rank")
//Processing
df.sort(asc("classId"), asc("rank"))
.withColumn("studentIds", collect_list("studentId")
.over(Window.partitionBy("classId").orderBy("rank")))
.groupBy("classId")
.agg(last("studentIds") as "studentIds")
【问题讨论】:
标签: scala dataframe apache-spark apache-spark-sql