【问题标题】:Spark groupby multiple columns separatelySpark groupby 多列分别
【发布时间】:2018-10-30 23:19:21
【问题描述】:

我有一个如下所示的 Spark 数据框,我想通过彼此独立的不同列对其执行一些聚合函数,并获取单个列的一些统计信息。

val df = (Seq((1, "a", "1"),
              (1,"b", "3"),
              (1,"c", "6"),
              (2, "a", "9"),
              (2,"c", "10"),
              (1,"b","8" ),
              (2, "c", "3"),
              (3,"r", "19")).toDF("col1", "col2", "col3"))

df.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   a|   1|
|   1|   b|   3|
|   1|   c|   6|
|   2|   a|   9|
|   2|   c|  10|
|   1|   b|   8|
|   2|   c|   3|
|   3|   r|  19|
+----+----+----+

我想按 col1 和 col2 分组,并得到 col3 列的平均值,得到以下输出数据帧:

+----+----+----+---------+---------+
|col1|col2|col3|mean_col1|mean_col2|
+----+----+----+---------+---------+
|   1|   a|   1|      4.5|      5.0|
|   1|   b|   3|      4.5|      5.5|
|   1|   c|   6|      4.5|     6.33|
|   2|   a|   9|     7.33|      5.0|
|   2|   c|  10|     7.33|     6.33|
|   1|   b|   8|      4.5|      5.5|
|   2|   c|   3|     7.33|     6.33|
|   3|   r|  19|     19.0|     19.0|
+----+----+----+---------+---------+

这可以通过以下操作来完成:

val col1df = df.groupBy("col1").agg(round(mean("col3"),2).alias("mean_col1"))

val col2df = df.groupBy("col2").agg(round(mean("col3"),2).alias("mean_col2"))

df.join(col1df, "col1").join(col2df, "col2").select($"col1",$"col2",$"col3",$"mean_col1",$"mean_col2").show()

但是,如果我要分组的列更多,我需要执行几个昂贵的连接操作。此外,在进行连接之前按每列分组似乎相当麻烦。通过最小化(最好是消除)连接操作并且不必生成数据帧 col1df 和 col2df 来获取输出数据帧的最佳方法是什么?

【问题讨论】:

    标签: apache-spark join group-by apache-spark-sql aggregate


    【解决方案1】:

    由于您希望最终表格包含所有原始行,这可以通过 window 函数来完成。

    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.functions._
    
    val df = (Seq((1, "a", "1"),
        (1,"b", "3"),
        (1,"c", "6"),
        (2, "a", "9"),
        (2,"c", "10"),
        (1,"b","8" ),
        (2, "c", "3"),
        (3,"r", "19")).toDF("col1", "col2", "col3"))
    
      df.show(false)
    
      val col1Window = Window.partitionBy("col1").rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
      val col2Window = Window.partitionBy("col2").rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    
    
      val res = df
                  .withColumn("mean_col1", round(mean("col3").over(col1Window), 2))
                  .withColumn("mean_col2", round(mean("col3").over(col2Window), 2))
    
      res.show(false)
    

    在 Window 函数的上下文中,partitionBy 类似于 groupBy,rangeBetween 定义了窗口的大小,即所有具有相同值的行分区列,或者可以将其视为按列分组。

    【讨论】:

    • 这行得通,谢谢。你知道在上面的例子中使用 partitionBy 是否有更快的方法来获取 col1df 和 col2df,还是它没有提供比 groupBy 的性能改进?
    • 有一种方法,对集合进行分组,但它不会为您提供您指定的确切答案,它会保留原始行。可能需要加入一些帖子。
    猜你喜欢
    • 2018-07-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-11-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-09-09
    相关资源
    最近更新 更多