【问题标题】:spark join performance: multiple column vs a single column火花连接性能:多列与单列
【发布时间】:2019-10-08 13:01:55
【问题描述】:

如果我在df1 中有[a,b,c] 列,在df2 中有[a,b,c] 列,还有d 列,那么d=concat_ws('_', *[a,b,c]) 在这两个列中是否会有性能差异:

  1. df1.join(df2, [a,b,c])
  2. df1.join(df2, d)

?

【问题讨论】:

标签: apache-spark apache-spark-sql


【解决方案1】:

这个问题不能用 yesno 来回答,因为答案取决于 DataFrame 的详细信息。

连接的性能在很大程度上取决于执行它需要多少改组的问题。如果连接的两侧都由相同的列分区,则连接会更快。通过查看join的执行计划可以看到分区的效果。

我们创建了两个 DataFrames df1df2 与列 abcd

val sparkSession = ...
sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
import sparkSession.implicits._
val cols = Seq("a","b","c")
def createDf = (1 to 3).map(i => (i,i,i)).toDF(cols:_*).withColumn("d", concat_ws("_", cols.map(col):_*))
val df1 = createDf
val df2 = createDf

df1df2 看起来都一样:

+---+---+---+-----+
|  a|  b|  c|    d|
+---+---+---+-----+
|  1|  1|  1|1_1_1|
|  2|  2|  2|2_2_2|
|  3|  3|  3|3_3_3|
+---+---+---+-----+

当我们按列 d 对两个 DataFrame 进行分区并将此列用作连接条件时

df1.repartition(4, col("d")).join(df2.repartition(4, col("d")), "d").explain()

我们得到执行计划

== Physical Plan ==
*(3) Project [d#13, a#7, b#8, c#9, a#25, b#26, c#27]
+- *(3) SortMergeJoin [d#13], [d#31], Inner
   :- *(1) Sort [d#13 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(d#13, 4)
   :     +- LocalTableScan [a#7, b#8, c#9, d#13]
   +- *(2) Sort [d#31 ASC NULLS FIRST], false, 0
      +- ReusedExchange [a#25, b#26, c#27, d#31], Exchange hashpartitioning(d#13, 4)

通过d 对两个DataFrame 进行分区,但加入abc

df1.repartition(4, col("d")).join(df2.repartition(4, col("d")), cols).explain()

导致执行计划

== Physical Plan ==
*(3) Project [a#7, b#8, c#9, d#13, d#31]
+- *(3) SortMergeJoin [a#7, b#8, c#9], [a#25, b#26, c#27], Inner
   :- *(1) Sort [a#7 ASC NULLS FIRST, b#8 ASC NULLS FIRST, c#9 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(a#7, b#8, c#9, 200)
   :     +- Exchange hashpartitioning(d#13, 4)
   :        +- LocalTableScan [a#7, b#8, c#9, d#13]
   +- *(2) Sort [a#25 ASC NULLS FIRST, b#26 ASC NULLS FIRST, c#27 ASC NULLS FIRST], false, 0
      +- ReusedExchange [a#25, b#26, c#27, d#31], Exchange hashpartitioning(a#7, b#8, c#9, 200)

其中包含比第一个计划多一个Exchange hashpartitioning。在这种情况下,abc 的连接会更慢。

另一方面,如果数据帧由abc 分区,则abc 的连接将比d 的连接更快。

【讨论】:

    【解决方案2】:

    我怀疑在没有连接的情况下加入会更快,因为只散列单个字符串而不是连接然后散列可能更便宜。前者涉及较少需要 GC 的 java 对象,但这不是完整的答案。

    请注意,这可能不是您的查询的性能限制步骤,因此任何一种方式都一样快。在性能调整方面,最好进行测试,而不是在没有数据的情况下猜测。

    同样如上所述,如果输入数据已正确分区,则不连接列使优化器有机会消除连接上的交换。

    df1.join(df2, [a,b,c])
    df1.join(df2, d)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-01-08
      • 1970-01-01
      • 2017-11-16
      • 2019-08-25
      • 2015-07-09
      • 1970-01-01
      • 2016-12-09
      相关资源
      最近更新 更多