【问题标题】:Rank per row over multiple columns in Spark DataframeSpark Dataframe 中多列的每行排名
【发布时间】:2019-08-20 15:43:07
【问题描述】:

我正在使用 spark 和 Scala 来转换 Dataframe ,我想在其中计算一个新变量,该变量计算许多变量中每行一个变量的排名。

例子-

Input DF-

+---+---+---+
|c_0|c_1|c_2|
+---+---+---+
| 11| 11| 35|
| 22| 12| 66|
| 44| 22| 12|
+---+---+---+

Expected DF-

+---+---+---+--------+--------+--------+
|c_0|c_1|c_2|c_0_rank|c_1_rank|c_2_rank|
+---+---+---+--------+--------+--------+
| 11| 11| 35|        2|        3|        1|
| 22| 12| 66|       2|       3|       1|
| 44| 22| 12|       1|       2|       3|
+---+---+---+--------+--------+--------+



这已经用 R 回答了 - Rank per row over multiple columns in R

但我需要在 spark-sql 中使用 scala 做同样的事情。谢谢您的帮助!

编辑- 4/1 。遇到一种情况,如果值相同,则排名应该不同。编辑第一行以复制情况。

【问题讨论】:

  • 到目前为止你尝试了什么?
  • 我尝试创建一个包含所有元素的数组类型的新列,然后尝试映射它并在对数组进行排序后使用 zipwithindex 来获取索引。但是在 df 上使用地图后,我被卡住了,无法使用 withcolumn 生成三个等级列。
  • 您能补充更多细节吗?您需要计算的排名是多少?

标签: scala apache-spark apache-spark-sql


【解决方案1】:

如果我理解正确,您希望在每一行中获得每列的排名。

让我们首先定义数据,以及要“排名”的列。

val df = Seq((11,  21,  35),(22,  12, 66),(44, 22 , 12))
    .toDF("c_0", "c_1", "c_2")
val cols = df.columns

然后我们定义一个UDF来查找数组中元素的索引。

val pos = udf((a : Seq[Int], elt : Int) => a.indexOf(elt)+1)

我们最终创建了一个排序数组(按降序排列)并使用 UDF 来查找每列的排名。

val ranks = cols.map(c => pos(col("array"), col(c)).as(c+"_rank"))
df.withColumn("array", sort_array(array(cols.map(col) : _*), false))
  .select((cols.map(col)++ranks) :_*).show 
+---+---+---+--------+--------+--------+
|c_0|c_1|c_2|c_0_rank|c_1_rank|c_2_rank|
+---+---+---+--------+--------+--------+
| 11| 12| 35|       3|       2|       1|
| 22| 12| 66|       2|       3|       1|
| 44| 22| 12|       1|       2|       3|
+---+---+---+--------+--------+--------+

编辑: 从 Spark 2.4 开始,我定义的 pos UDF 可以替换为内置函数 array_position(column: Column, value: Any),其工作方式完全相同(第一个索引为 1)。这样可以避免使用效率稍低的 UDF。

编辑2: 如果您有重复的键,上面的代码将生成重复的索引。如果你想避免它,你可以创建数组,压缩它以记住哪一列是哪一列,对其进行排序并再次压缩它以获得最终排名。它看起来像这样:

val colMap = df.columns.zipWithIndex.map(_.swap).toMap
val zip = udf((s: Seq[Int]) => s
    .zipWithIndex
    .sortBy(-_._1)
    .map(_._2)
    .zipWithIndex
    .toMap
    .mapValues(_+1))
val ranks = (0 until cols.size)
    .map(i => 'zip.getItem(i) as colMap(i) + "_rank")
val result = df
    .withColumn("zip", zip(array(cols.map(col) : _*)))
    .select(cols.map(col) ++ ranks :_*)

【讨论】:

  • 这行得通,已经接受了,只是想知道如果在大型数据集上使用 udf 是否会影响性能,因为它的性质是 ser-deser ,另外你对并行性有什么想法,会这样吗?不易OOM。
  • 这段代码根本不容易出现OOM。这是一个简单的按行计算,将完美分布。无需以任何方式对行进行分组,驱动程序上也没有任何内容,因此您很安全。另外因为您询问了 UDF,我检查了从 spark 2.4 开始,内置函数可以替换我的 UDF(我编辑了我的答案以提及它)。然而,即使使用 UDF,我认为性能也不会受到太大影响。如果您两者都尝试,请告诉我们 ;-)
  • 感谢 Oli,将对此进行检查并返回性能结果。
  • 如果元素值相同,我想这将返回相同的索引(在这种情况下为排名)。我希望它们增加。(11,11,35 应该返回 2.3,1)我应该在 udf 中添加逻辑还是你相信有更简单的方法。
  • 使用此方法,不能有任何重复的索引。实际上,一个元素在数组中只能有一个索引。顺便说一句,如果您使用的是 Spark 2.4,您甚至不需要 UDF ;)
【解决方案2】:

解决此问题的一种方法是使用 windows。

val df = Seq((11,  21,  35),(22,  12, 66),(44, 22 , 12))
    .toDF("c_0", "c_1", "c_2")
(0 to 2)
    .map("c_"+_)
    .foldLeft(df)((d, column) => 
          d.withColumn(column+"_rank", rank() over Window.orderBy(desc(column))))
    .show
+---+---+---+--------+--------+--------+                                        
|c_0|c_1|c_2|c_0_rank|c_1_rank|c_2_rank|
+---+---+---+--------+--------+--------+
| 22| 12| 66|       2|       3|       1|
| 11| 21| 35|       3|       2|       2|
| 44| 22| 12|       1|       1|       3|
+---+---+---+--------+--------+--------+

但这不是一个好主意。所有的数据最终都会放在一个分区中,如果所有数据都不能放入一个执行器中,则会导致 OOM 错误。

另一种方法需要对数据框进行 3 次排序,但至少可以扩展到任何大小的数据。

让我们定义一个函数来压缩具有连续索引的数据帧(它存在于 RDD,但不存在于数据帧)

def zipWithIndex(df : DataFrame, name : String) : DataFrame = {
    val rdd = df.rdd.zipWithIndex
      .map{ case (row, i) => Row.fromSeq(row.toSeq :+ (i+1)) }
    val newSchema = df.schema.add(StructField(name, LongType, false))
    df.sparkSession.createDataFrame(rdd, newSchema)
}

让我们在同一个数据帧上使用它df

(0 to 2)
    .map("c_"+_)
    .foldLeft(df)((d, column) => 
        zipWithIndex(d.orderBy(desc(column)), column+"_rank"))
    .show

提供与上述完全相同的结果。

【讨论】:

  • 我对用例类有另一个建议。这将有助于不将其转换为 RDD 并直接使用 Spark Dataset
  • 感谢及时回复,但输出与预期输出不符。我希望最高值的最低排名..例如 - c_0_rank 应该是 1,因为 c_0 在第一行的值为 44 而不是 3。
  • 对,我错过了它按降序排序的事实。我编辑了我的答案。它已修复。
  • 第一行是正确的,第二行和第三行的输出仍然不正确。请检查问题中的预期 DF,我已将其格式化以更清晰。
  • 由于我没有理解你的问题,所以我发布了一个新的答案。
【解决方案3】:

您可能可以创建一个窗口函数。请注意,如果您有太多数据,这很容易受到 OOM 的影响。但是,我只是想在这里介绍一下窗口函数的概念。

inputDF.createOrReplaceTempView("my_df")
val expectedDF =  spark.sql("""
    select 
        c_0
        , c_1
        , c_2
        , rank(c_0) over (order by c_0 desc) c_0_rank
        , rank(c_1) over (order by c_1 desc) c_1_rank
        , rank(c_2) over (order by c_2 desc) c_2_rank 
    from my_df""")
expectedDF.show()

+---+---+---+--------+--------+--------+
|c_0|c_1|c_2|c_0_rank|c_1_rank|c_2_rank|
+---+---+---+--------+--------+--------+
| 44| 22| 12|       3|       3|       1|
| 11| 21| 35|       1|       2|       2|
| 22| 12| 66|       2|       1|       3|
+---+---+---+--------+--------+--------+

【讨论】:

  • 所需的输出应该是-c_0,c_1,c_2,c_0_rank,c_1_rank,c_2_rank 44,22,12,1,2,3 11,21,35,3,2,1 22,12,66,2,3,1 具有最低排名的最高数字,其中 c_n_rank 列指定 c_n 列值的相应排名。
  • 已编辑以按等级 desc 排序
  • 请检查有问题的预期 DF,已对其进行编辑以提高清晰度
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-01-06
  • 1970-01-01
  • 2016-07-08
  • 2016-06-06
相关资源
最近更新 更多