【问题标题】:Top N items from a Spark DataFrame/RDDSpark DataFrame/RDD 中的前 N ​​个项目
【发布时间】:2018-02-13 20:28:12
【问题描述】:

我的要求是从数据框中获取前 N 个项目。

我有这个数据框:

val df = List(
      ("MA", "USA"),
      ("MA", "USA"),
      ("OH", "USA"),
      ("OH", "USA"),
      ("OH", "USA"),
      ("OH", "USA"),
      ("NY", "USA"),
      ("NY", "USA"),
      ("NY", "USA"),
      ("NY", "USA"),
      ("NY", "USA"),
      ("NY", "USA"),
      ("CT", "USA"),
      ("CT", "USA"),
      ("CT", "USA"),
      ("CT", "USA"),
      ("CT", "USA")).toDF("value", "country")

我能够将其映射到RDD[((Int, String), Long)] colValCount: 读取:((colIdx, value), count)

((0,CT),5)
((0,MA),2)
((0,OH),4)
((0,NY),6)
((1,USA),17)

现在我需要获取每个列索引的前 2 个项目。所以我的预期输出是这样的:

RDD[((Int, String), Long)]

((0,CT),5)
((0,NY),6)
((1,USA),17)

我尝试在 DataFrame 中使用 freqItems api,但速度很慢。

欢迎提出任何建议。

【问题讨论】:

  • 我认为您需要 sort()limit() 的某种组合,但我不明白您是如何获得输出的。

标签: scala apache-spark top-n


【解决方案1】:

例如:

import org.apache.spark.sql.functions._

df.select(lit(0).alias("index"), $"value")
   .union(df.select(lit(1), $"country"))
   .groupBy($"index", $"value")
   .count
  .orderBy($"count".desc)
  .limit(3)
  .show

// +-----+-----+-----+
// |index|value|count|
// +-----+-----+-----+
// |    1|  USA|   17|
// |    0|   NY|    6|
// |    0|   CT|    5|
// +-----+-----+-----+

地点:

df.select(lit(0).alias("index"), $"value")
  .union(df.select(lit(1), $"country"))

创建一个两列DataFrame

// +-----+-----+
// |index|value|
// +-----+-----+
// |    0|   MA|
// |    0|   MA|
// |    0|   OH|
// |    0|   OH|
// |    0|   OH|
// |    0|   OH|
// |    0|   NY|
// |    0|   NY|
// |    0|   NY|
// |    0|   NY|
// |    0|   NY|
// |    0|   NY|
// |    0|   CT|
// |    0|   CT|
// |    0|   CT|
// |    0|   CT|
// |    0|   CT|
// |    1|  USA|
// |    1|  USA|
// |    1|  USA|
// +-----+-----+

如果您希望每列有两个值:

import org.apache.spark.sql.DataFrame

def topN(df: DataFrame, key: String, n: Int)  = {
   df.select(
        lit(df.columns.indexOf(key)).alias("index"), 
        col(key).alias("value"))
     .groupBy("index", "value")
     .count
     .orderBy($"count")
     .limit(n)
}

topN(df, "value", 2).union(topN(df, "country", 2)).show
// +-----+-----+-----+ 
// |index|value|count|
// +-----+-----+-----+
// |    0|   MA|    2|
// |    0|   OH|    4|
// |    1|  USA|   17|
// +-----+-----+-----+

就像pault said - 只是“sort()limit() 的某种组合”。

【讨论】:

  • orderBy()limit() 的某种组合...基本上就是我所说的 :-)
  • @pault 是的,我猜是 groupByagg 的某种组合,这里不时带有窗口函数 :-)
  • .orderBy($"count".desc).limit(3) 在这种情况下给出了指定的结果,但在一般情况下它没有给出每个列索引的前 2 项。
  • 谢谢让我试试看!
  • @user8371915 这可行,但对于 TB 数据仍然很慢,因为我也在运行 170 多列。性能与我之前提到的 freqItems API 相当。处理整个数据大约需要 1 个多小时。
【解决方案2】:

执行此操作的最简单方法(自然窗口函数)是编写 SQL。 Spark 自带 SQL 语法,而 SQL 是解决这个问题的绝佳工具。

将您的数据框注册为临时表,然后对其进行分组和窗口化。

spark.sql("""SELECT idx, value, ROW_NUMBER() OVER (PARTITION BY idx ORDER BY c DESC) as r 
             FROM (
               SELECT idx, value, COUNT(*) as c 
               FROM (SELECT 0 as idx, value FROM df UNION ALL SELECT 1, country FROM df) 
               GROUP BY idx, value) 
             HAVING r <= 2""").show()

我想看看是否有任何程序/scala 方法可以让您在没有迭代或循环的情况下执行窗口函数。我不知道 Spark API 中有任何支持它的东西。

顺便说一句,如果您想要包含任意数量的列,那么您可以很容易地使用列列表动态生成内部部分(SELECT 0 as idx, value ... UNION ALL SELECT 1, country 等)。

【讨论】:

  • 谢谢,我已经尝试使用它抱怨 GC 开销的窗口函数。我也会试试你的解决方案。
【解决方案3】:

鉴于您的最后一个 RDD:

val rdd =
  sc.parallelize(
    List(
      ((0, "CT"), 5),
      ((0, "MA"), 2),
      ((0, "OH"), 4),
      ((0, "NY"), 6),
      ((1, "USA"), 17)
    ))

rdd.filter(_._1._1 == 0).sortBy(-_._2).take(2).foreach(println)
> ((0,NY),6)
> ((0,CT),5)
rdd.filter(_._1._1 == 1).sortBy(-_._2).take(2).foreach(println)
> ((1,USA),17)

我们首先获取给定列索引 (.filter(_._1._1 == 0)) 的项目。然后我们按降序对项目进行排序 (.sortBy(-_._2))。最后,我们最多取前 2 个元素 (.take(2)),如果记录的 nbr 小于 2,则只取 1 个元素。

【讨论】:

    【解决方案4】:

    您可以使用 Sparkz 中定义的这个辅助函数映射每个单独的分区,然后将它们组合在一起:

    package sparkz.utils
    
    import scala.reflect.ClassTag
    
    object TopElements {
      def topN[T: ClassTag](elems: Iterable[T])(scoreFunc: T => Double, n: Int): List[T] =
        elems.foldLeft((Set.empty[(T, Double)], Double.MaxValue)) {
          case (accumulator@(topElems, minScore), elem) =>
            val score = scoreFunc(elem)
            if (topElems.size < n)
              (topElems + (elem -> score), math.min(minScore, score))
            else if (score > minScore) {
              val newTopElems = topElems - topElems.minBy(_._2) + (elem -> score)
              (newTopElems, newTopElems.map(_._2).min)
            }
            else accumulator
        }
          ._1.toList.sortBy(_._2).reverse.map(_._1)
    }
    

    来源:https://github.com/gm-spacagna/sparkz/blob/master/src/main/scala/sparkz/utils/TopN.scala

    【讨论】:

      猜你喜欢
      • 2020-03-14
      • 1970-01-01
      • 2017-02-03
      • 2017-04-11
      • 2022-11-02
      • 2017-08-16
      • 2016-03-21
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多