【问题标题】:Performance comparison between groupBy + join vs window func SparkgroupBy + join vs window func Spark之间的性能比较
【发布时间】:2021-06-04 11:09:58
【问题描述】:

这两个实现几乎一样(唯一不同的是行的顺序):

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._
def windowSpec = Window.partitionBy("key")
val df1 = Seq((3, "A", 5), (1, "A", 2), (3, "A", 5), (3, "B", 13)).toDF("key", "Categ1", "value")  
df1.withColumn("avg", avg("value").over(windowSpec)).show
+---+------+-----+-----------------+
|key|Categ1|value|              avg|
+---+------+-----+-----------------+
|  1|     A|    2|              2.0|
|  3|     A|    5|7.666666666666667|
|  3|     A|    5|7.666666666666667|
|  3|     B|   13|7.666666666666667|
+---+------+-----+-----------------+

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._
val df1 = Seq((3, "A", 5), (1, "A", 2), (3, "A", 5), (3, "B", 13)).toDF("key", 
val df2 = df1.groupBy("key").agg(avg("value") as "avg")
df1.join(df2,  Seq("key")).show
+---+------+-----+-----------------+
|key|Categ1|value|              avg|
+---+------+-----+-----------------+
|  3|     A|    5|7.666666666666667|
|  1|     A|    2|              2.0|
|  3|     A|    5|7.666666666666667|
|  3|     B|   13|7.666666666666667|
+---+------+-----+-----------------+

乍一看,我认为第一个应该更快,因为它避免了join,但根据经验,有时会得出不同的结论。

window func 有额外的开销吗? window funcgroupBy 之间是否存在复杂性差异?谢谢。

附带问题
使用window func时是否无法保证类型安全?
相关:How use Window aggrgates on strongly typed Spark Datasets?

【问题讨论】:

  • @Hogan 你说的是哪种索引? Spark 中没有索引的概念——Spark 不是数据库,而是支持 SQL 的批处理框架。

标签: sql apache-spark group-by window-functions static-typing


【解决方案1】:

你可以用explain方法看看window的解决方案实物图:

== Physical Plan ==
Window [avg(cast(value#9 as bigint)) windowspecdefinition(key#7, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS avg#14], [key#7]
+- *(1) Sort [key#7 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(key#7, 200)
      +- LocalTableScan [key#7, Categ1#8, value#9]

它会加载数据,打乱数据以获得每个分区 1 个键,然后计算平均值。

第二个解决方案:

== Physical Plan ==
*(3) Project [key#7, Categ1#8, value#9, avg#24]
+- *(3) BroadcastHashJoin [key#7], [key#27], Inner, BuildRight
   :- LocalTableScan [key#7, Categ1#8, value#9]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
      +- *(2) HashAggregate(keys=[key#27], functions=[avg(cast(value#29 as bigint))], output=[key#27, avg#24])
         +- Exchange hashpartitioning(key#27, 200)
            +- *(1) HashAggregate(keys=[key#27], functions=[partial_avg(cast(value#29 as bigint))], output=[key#27, sum#37, count#38L])
               +- LocalTableScan [key#27, value#29]

执行计划要复杂得多,加载数据,随机播放以获得 1 个 key par 分区。您的数据示例非常小,因此我的 spark 使用 broadcastJoin 而不是 sortMergeJoin 或 ShuffleHashjoin .. 但它可能会再次导致 shuffle 然后加入。

它还会加载您的 df1 2 次,因为您的数据没有持久化,并且第一个解决方案接缝更容易理解。

第一个解决方案更好。

【讨论】:

  • 认为是一个软弱的陈述。
  • 很好的分析,谢谢。第二个有HashAggregate 两次,我认为是由groupBy 操作引起的,而第一个没有HashAggregate,为什么?它不应该对分组做同样的事情吗?相反,它使用Sort,对此有什么想法吗?
  • 优化器并不总是完美的。
  • 排序是为了计算每个键的平均值,然后计算下一个键
【解决方案2】:
  • 撇开 1) 没有明确设置“随机分区”的数量,以及 2) 未说明设置 AQE 真或假,以及 3) 存在少量数据的事实,

    • 然后注意没有缓存(恕我直言,没有必要),

然后接近1我跑了:

val dfA = spark.table("ZZZ")
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._
def windowSpec = Window.partitionBy("number2")
dfA.withColumn("avg", avg("number").over(windowSpec)).explain

显示:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Window [number#73, lit#74, number2#75, avg(_w0#85L) windowspecdefinition(number2#75, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS avg#80], [number2#75]
   +- Sort [number2#75 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(number2#75, 200), true, [id=#141]
         +- Project [number#73, lit#74, number2#75, cast(number#73 as bigint) AS _w0#85L]
            +- FileScan parquet default.zzz[number#73,lit#74,number2#75] Batched: true, DataFilters: [], Format: Parquet, Location: CatalogFileIndex[dbfs:/user/hive/warehouse/zzz], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<number:int,lit:string>

排序是为了在分区内可以按顺序读取相同的键值,然后求和并除以该键的键值总数,以在有新键值时和最后得到平均值。这是最快的方法,因为它没有第二个选项中的 JOIN。

然后接近2我跑了:

spark.conf.set("spark.sql.autoBroadcastJoinThreshold",-1)
spark.conf.set("spark.sql.adaptive.enabled",false)
val dfB = spark.table("ZZZ")
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._
val dfC = dfB.groupBy("number2").agg(avg("number") as "avg")
dfB.join(dfC, Seq("number2")).explain

显示:

== Physical Plan ==
*(4) Project [number2#75, number#73, lit#74, avg#500]
+- *(4) SortMergeJoin [number2#75], [number2#505], Inner
   :- Sort [number2#75 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(number2#75, 200), true, [id=#1484]
   :     +- *(1) ColumnarToRow
   :        +- FileScan parquet default.zzz[number#73,lit#74,number2#75] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/zzz/number2=0, dbfs:/user/hive/warehouse/zzz/number2=..., PartitionFilters: [isnotnull(number2#75)], PushedFilters: [], ReadSchema: struct<number:int,lit:string>
   +- *(3) Sort [number2#505 ASC NULLS FIRST], false, 0
      +- *(3) HashAggregate(keys=[number2#505], functions=[finalmerge_avg(merge sum#512, count#513L) AS avg(cast(number#503 as bigint))#499])
         +- Exchange hashpartitioning(number2#505, 200), true, [id=#1491]
            +- *(2) HashAggregate(keys=[number2#505], functions=[partial_avg(cast(number#503 as bigint)) AS (sum#512, count#513L)])
               +- *(2) ColumnarToRow
                  +- FileScan parquet default.zzz[number#503,number2#505] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/zzz/number2=0, dbfs:/user/hive/warehouse/zzz/number2=..., PartitionFilters: [isnotnull(number2#505)], PushedFilters: [], ReadSchema: struct<number:int>

此选项涉及 JOIN 和 2 次静态数据读取。如前所述,“自加入”和在某些情况下需要帮助的联合,应根据上面的票证使用重用交换,但事实并非如此。这里我们有一个“自我加入”的元素恕我直言,但 Catalyst 有不同的看法。如您在设置中所见,这是一种非 AQE 方法。它要复杂得多。

结论:

这种启用 AQE 的查询可以自行适应使用广播 哈希联接,因此在此处禁用 AQE。

第一个选项是要走的路,因为它不会从休息中读取两次,并且 不需要 JOIN。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-01-28
    • 2019-02-05
    • 2011-07-27
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多