【问题标题】:Computing rank of a row计算行的秩
【发布时间】:2016-02-17 05:18:42
【问题描述】:

我想根据一个字段对用户 ID 进行排名。对于相同的字段值,排名应该相同。该数据在 Hive 表中。

例如

user value
a       5
b       10
c       5
d       6

Rank
a - 1
c - 1
d - 3
b - 4

我该怎么做?

【问题讨论】:

    标签: scala apache-spark dataframe hive apache-spark-sql


    【解决方案1】:

    可以通过 DataFrame API 使用 rank 窗口函数:

    import org.apache.spark.sql.functions.rank
    import org.apache.spark.sql.expressions.Window
    
    val w = Window.orderBy($"value")
    
    val df = sc.parallelize(Seq(
      ("a", 5), ("b", 10), ("c", 5), ("d", 6)
    )).toDF("user", "value")
    
    df.select($"user", rank.over(w).alias("rank")).show
    
    // +----+----+
    // |user|rank|
    // +----+----+
    // |   a|   1|
    // |   c|   1|
    // |   d|   3|
    // |   b|   4|
    // +----+----+
    

    或原始 SQL:

    df.registerTempTable("df")
    sqlContext.sql("SELECT user, RANK() OVER (ORDER BY value) AS rank FROM df").show
    
    // +----+----+
    // |user|rank|
    // +----+----+
    // |   a|   1|
    // |   c|   1|
    // |   d|   3|
    // |   b|   4|
    // +----+----+
    

    但效率极低。

    您也可以尝试使用 RDD API,但这并不完全简单。首先让我们将 DataFrame 转换为 RDD:

    import org.apache.spark.sql.Row
    import org.apache.spark.rdd.RDD
    import org.apache.spark.RangePartitioner
    
    val rdd: RDD[(Int, String)] = df.select($"value", $"user")
      .map{ case Row(value: Int, user: String) => (value, user) }
    
    val partitioner = new RangePartitioner(rdd.partitions.size,  rdd)
    val sorted =  rdd.repartitionAndSortWithinPartitions(partitioner)
    

    接下来我们必须计算每个分区的排名:

    def rank(iter: Iterator[(Int,String)]) =  {
      val zero = List((-1L, Integer.MIN_VALUE, "", 1L))
    
      def f(acc: List[(Long,Int,String,Long)], x: (Int, String)) = 
        (acc.head, x) match {
          case (
            (prevRank: Long, prevValue: Int, _, offset: Long),
            (currValue: Int, label: String)) => {
          val newRank = if (prevValue == currValue) prevRank else prevRank + offset
          val newOffset = if (prevValue == currValue) offset + 1L else 1L
          (newRank, currValue, label, newOffset) :: acc
        }
      }
    
      iter.foldLeft(zero)(f).reverse.drop(1).map{case (rank, _, label, _) =>
        (rank, label)}.toIterator
    }
    
    
    val partRanks = sorted.mapPartitions(rank)
    

    每个分区的偏移量

    def getOffsets(sorted: RDD[(Int, String)]) = sorted
      .mapPartitionsWithIndex((i: Int, iter: Iterator[(Int, String)]) => 
        Iterator((i, iter.size)))
      .collect
      .foldLeft(List((-1, 0)))((acc: List[(Int, Int)], x: (Int, Int)) => 
        (x._1, x._2 + acc.head._2) :: acc)
      .toMap
    
    val offsets = sc.broadcast(getOffsets(sorted))
    

    最后的排名:

    def adjust(i: Int, iter: Iterator[(Long, String)]) = 
      iter.map{case (rank, label) => (rank + offsets.value(i - 1).toLong, label)}
    
    val ranks = partRanks
      .mapPartitionsWithIndex(adjust)
      .map{case (i, label) => (1 + i , label)}
    

    【讨论】:

    • 我认为这是一个很好的答案,但是,我们能否详细说明为什么 dataframe API 在这里效率低下?
    • @BlueSky 因为没有partitionByWindow 定义将所有内容都洗牌到单个分区。使用今天的Daset API,您可以重写RDD 版本。
    • 补充@zero323的答案,即使使用partitionBy也可能效率低下 - 例如在某些类型的交易数据中,少数客户持有绝大多数交易可能很常见;我遇到过银行数据,其中一位客户有效地拥有全行所有交易的 45%,这是因为该银行是做市商,并且(在数据中)是其自身的客户。
    猜你喜欢
    • 2012-02-28
    • 1970-01-01
    • 2018-02-16
    • 1970-01-01
    • 2012-06-23
    • 2012-08-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多