【问题标题】:Spark Dataframe: Group and rank rows on a certain column valueSpark Dataframe:对某个列值的行进行分组和排名
【发布时间】:2020-02-14 03:25:30
【问题描述】:

当“ID”列编号从 1 开始到最大值然后从 1 重置时,我正在尝试对列进行排名。

所以,前三行在“ID”上有一个连续的编号;因此这些应该与组等级 = 1 分组。第 4 行和第 5 行在另一个组中,组排名 = 2。

行按“rownum”列排序。我知道 row_number 窗口函数,但我认为我不能申请这个用例,因为没有常量窗口。我只能想到遍历数据框中的每一行,但不确定当数字重置为 1 时如何更新列。

val df = Seq( (1, 1), (2, 2), (3, 3), (4, 1), (5, 2), (6, 1), (7, 1), (8, 2) ).toDF("rownum", "ID") df.show()

预期结果如下:

【问题讨论】:

  • 您了解超前滞后函数吗?你可以使用它,匹配上一个,检查上一行是1并且当前不是1,然后保持相同的排名,否则增加1。
  • 我使用了前导窗口函数,得到了下一行的“ID”值;我也明白你所说的逻辑明智,但不确定如何在 Spark 中实现。 .withColumn("lead_col", lead(col("ID"), 1).over(Window.orderBy(col("rownum"))))
  • 给我一些时间,会帮助你工作的。

标签: scala apache-spark apache-spark-sql grouping ranking


【解决方案1】:

您可以使用 2 个窗口函数来完成,第一个用于标记状态,第二个用于计算运行总和:

df
  .withColumn("increase", $"ID" > lag($"ID",1).over(Window.orderBy($"rownum")))
  .withColumn("group_rank_of_ID",sum(when($"increase",lit(0)).otherwise(lit(1))).over(Window.orderBy($"rownum")))
  .drop($"increase")
  .show()

给予:

+------+---+----------------+
|rownum| ID|group_rank_of_ID|
+------+---+----------------+
|     1|  1|               1|
|     2|  2|               1|
|     3|  3|               1|
|     4|  1|               2|
|     5|  2|               2|
|     6|  1|               3|
|     7|  1|               4|
|     8|  2|               4|
+------+---+----------------+

【讨论】:

    【解决方案2】:

    正如@Prithvi 所说,我们可以在这里使用lead

    要使用lead这样的窗口函数,我们至少需要提供命令。

    考虑

    
    val nextID = lag('ID, 1, -1) over Window.orderBy('rownum)
    val isNewGroup = 'ID <= nextID cast "integer"
    val group_rank_of_ID = sum(isNewGroup) over Window.orderBy('rownum)
    
    /* you can try 
    df.withColumn("intermediate", nextID).show
    //                           ^^^^^^^-- can be `isNewGroup`, or other vals
    */
    
    
    
    df.withColumn("group_rank_of_ID", group_rank_of_ID).show
    
    /* returns
    +------+---+----------------+
    |rownum| ID|group_rank_of_ID|
    +------+---+----------------+
    |     1|  1|               0|
    |     2|  2|               0|
    |     3|  3|               0|
    |     4|  1|               1|
    |     5|  2|               1|
    |     6|  1|               2|
    |     7|  1|               3|
    |     8|  2|               3|
    +------+---+----------------+
    */
    
    
    df.withColumn("group_rank_of_ID", group_rank_of_ID + 1).show
    
    /* returns
    +------+---+----------------+
    |rownum| ID|group_rank_of_ID|
    +------+---+----------------+
    |     1|  1|               1|
    |     2|  2|               1|
    |     3|  3|               1|
    |     4|  1|               2|
    |     5|  2|               2|
    |     6|  1|               3|
    |     7|  1|               4|
    |     8|  2|               4|
    +------+---+----------------+
    */
    

    【讨论】:

    • 要使用Window,需要import org.apache.spark.sql.expressions.Window
    • 谢谢,这是个好主意。
    猜你喜欢
    • 1970-01-01
    • 2022-12-11
    • 2016-09-19
    • 2014-05-15
    • 2021-03-24
    • 2019-08-20
    • 1970-01-01
    • 2018-10-18
    • 1970-01-01
    相关资源
    最近更新 更多