【问题标题】:Spark : How to preserve order with collect_set in a partition of a dataframe?Spark:如何在数据框的分区中使用 collect_set 保留顺序?
【发布时间】:2021-07-05 21:54:50
【问题描述】:

我已按如下一列(排名)对数据进行分区和排序

+-------+---------+----+
|classId|studentId|rank|
+-------+---------+----+
|1      |123      |1   |
|1      |5000     |2   |
|1      |5000     |3   |
|1      |5000     |4   |
|1      |908      |5   |
|1      |908      |6   |
|2      |123      |1   |
|2      |123      |2   |
|2      |123      |3   |
|2      |908      |4   |
+-------+---------+----+

现在我想要以下输出,按排名列顺序排列的 StudentIds 数组。

+-------+----------------------------------+
|classId|studentIds                        |
+-------+----------------------------------+
|1      |[1234, 5000, 908]                 |
|2      |[1234, 908]                       |
+-------+----------------------------------+

我尝试在分区上执行 collect_list ,但这让我以正确的顺序重复

+-------+---------------------------------+
|classId|studentIds                       |
+-------+---------------------------------+
|1      |[123, 5000, 5000, 5000, 908, 908]|
|2      |[123, 123, 123, 908]             |
+-------+---------------------------------+

我在分区上尝试了 collect_set,它给了我不同的值但学生 ID 的顺序不正确

+-------+----------------+
|classId|studentIds      |
+-------+----------------+
|1      |[5000, 123, 908]|
|2      |[123, 908]      |
+-------+----------------+

代码:

//Sample Data
val simpleData = Seq(("2", "123", 1),("2", "908", 4),
    ("1", "123", 1),    ("1", "5000", 3),    ("1", "908", 5),    ("1", "5000", 2),
    ("1", "5000", 4),    ("1", "908",6), ("2", "123", 2),    ("2", "123", 3)
  )
val df = simpleData.toDF("classId", "studentId", "rank")

//Processing
df.sort(asc("classId"), asc("rank"))
.withColumn("studentIds", collect_list("studentId")
  .over(Window.partitionBy("classId").orderBy("rank")))
.groupBy("classId")
.agg(last("studentIds") as "studentIds")

【问题讨论】:

    标签: scala dataframe apache-spark apache-spark-sql


    【解决方案1】:

    您可以使用array_distinct 函数删除collect_list as 之后的重复项

    df.sort(asc("classId"), asc("rank"))
      .withColumn("studentIds", array_distinct(collect_list("studentId")
        .over(Window.partitionBy("classId").orderBy("rank"))))
      .groupBy("classId")
      .agg(last("studentIds") as "studentIds")
    

    输出:

    +-------+----------------+
    |classId|studentIds      |
    +-------+----------------+
    |1      |[123, 5000, 908]|
    |2      |[123, 908]      |
    +-------+----------------+
    

    【讨论】:

    • 感谢这工作。我不知道你可以在 collect_list 上使用它。我做了一个创建udf的解决方法。 val udfRemoveDuplicates = udf{a: Seq[Long] => a.distinct} 并在最后一列使用它studentIds
    • 您可以查看文档以了解集合的功能,如果可能,最好避免使用 udf。
    • 如果这个答案有帮助,请接受作为答案。
    猜你喜欢
    • 2019-11-09
    • 1970-01-01
    • 2019-07-19
    • 1970-01-01
    • 2018-02-21
    • 2021-02-11
    • 2018-11-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多