【问题标题】:Spark : Is there differences between agg function and a window function on a spark dataframe?Spark:spark 数据帧上的 agg 函数和窗口函数之间有区别吗?
【发布时间】:2019-08-24 21:51:27
【问题描述】:

我想对 spark Dataframe (Spark 2.1) 中的列应用总和,我有两种方法可以做到这一点:

1- 带窗口功能:

val windowing = Window.partitionBy("id")
dataframe
.withColumn("sum", sum(col("column_1")) over windowing)

2- 使用 agg 函数:

dataframe
.groupBy("id")
.agg(sum(col("column_1")).alias("sum"))

就表演而言,最好的方法是什么?这两种方法有什么区别?

【问题讨论】:

    标签: apache-spark dataframe aggregate-functions window-functions


    【解决方案1】:

    正如@Oli 提到的聚合函数可以在窗口(第一种情况)以及分组(第二种情况)中使用。在性能方面,“带分组的聚合函数”比“带窗口的聚合函数”要快得多。我们可以通过分析物理计划来可视化这一点。

    df.groupBy("id").agg(sum($"expense").alias("total_expense")).explain()
    df.show
    +---+----------+                                                                   
    |  id|  expense|
    +---+----------+
    |   1|      100|
    |   2|      300|
    |   1|      100|
    |   3|      200|
    +---+----------+
    

    1- 与窗口聚合:

    df.withColumn("total_expense", sum(col("expense")) over window).show
    +---+----------+-------------------+                                                     
    | id|   expense|      total_expense|
    +---+----------+-------------------+
    |  3|       200|                200|
    |  1|       100|                200|
    |  1|       100|                200|
    |  2|       300|                300|
    +---+----------+-------------------+
    
    df.withColumn("total_expense", sum(col("expense")) over window).explain
    == Physical Plan ==
    Window [sum(cast(expense#9 as bigint)) windowspecdefinition(id#8, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS total_expense#265L], [id#8]
    +- *(2) Sort [id#8 ASC NULLS FIRST], false, 0
       +- Exchange hashpartitioning(id#8, 200), true, [id=#144]
          +- *(1) Project [_1#3 AS id#8, _2#4 AS expense#9]
             +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#3, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#4]
                +- Scan[obj#2]
    

    2- 使用 GroupBy 进行聚合:

    df.groupBy("id").agg(sum($"expense").alias("total_expense")).show
    +---+------------------+                                                             
    | id|     total_expense|
    +---+------------------+
    |  3|               200|
    |  1|               200|
    |  2|               300|
    +---+------------------+
    
    df.groupBy("id").agg(sum($"expense").alias("total_expense")).explain()
        == Physical Plan ==
        *(2) HashAggregate(keys=[id#8], functions=[sum(cast(expense#9 as bigint))])
        +- Exchange hashpartitioning(id#8, 200), true, [id=#44]
           +- *(1) HashAggregate(keys=[id#8], functions=[partial_sum(cast(expense#9 as bigint))])
              +- *(1) Project [_1#3 AS id#8, _2#4 AS expense#9]
                 +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#3, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#4]
                    +- Scan[obj#2]
    

    根据执行计划,我们可以看到在 windows 的情况下,有一个总 shuffle 和一个 sort,而在 groupby 的情况下,有一个 reduced shuffle(在本地聚合 partial_sum 之后的shuffle)。

    【讨论】:

      【解决方案2】:

      您可以在窗口内(第一种情况)或分组时(第二种情况)使用聚合函数。不同之处在于,对于一​​个窗口,每个 将与在其整个窗口上计算的聚合结果相关联。但是,在分组时,每个 都将与该组的聚合结果相关联(一组行仅成为一行)。

      在你的情况下,你会得到这个。

      val dataframe = spark.range(6).withColumn("key", 'id % 2)
      dataframe.show
      +---+---+
      | id|key|
      +---+---+
      |  0|  0|
      |  1|  1|
      |  2|  0|
      |  3|  1|
      |  4|  0|
      |  5|  1|
      +---+---+
      

      案例 1:窗口化

      val windowing = Window.partitionBy("key")
      dataframe.withColumn("sum", sum(col("id")) over windowing).show
      +---+---+---+                                                                   
      | id|key|sum|
      +---+---+---+
      |  0|  0|  6|
      |  2|  0|  6|
      |  4|  0|  6|
      |  1|  1|  9|
      |  3|  1|  9|
      |  5|  1|  9|
      +---+---+---+
      

      案例 2:分组

      dataframe.groupBy("key").agg(sum('id)).show
      +---+-------+
      |key|sum(id)|
      +---+-------+
      |  0|      6|
      |  1|      9|
      +---+-------+
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2010-09-05
        • 2014-03-16
        • 2011-08-31
        • 1970-01-01
        • 1970-01-01
        • 2014-11-21
        • 1970-01-01
        相关资源
        最近更新 更多