这个问题不能用 yes 或 no 来回答,因为答案取决于 DataFrame 的详细信息。
连接的性能在很大程度上取决于执行它需要多少改组的问题。如果连接的两侧都由相同的列分区,则连接会更快。通过查看join的执行计划可以看到分区的效果。
我们创建了两个 DataFrames df1 和 df2 与列 a、b、c 和 d:
val sparkSession = ...
sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
import sparkSession.implicits._
val cols = Seq("a","b","c")
def createDf = (1 to 3).map(i => (i,i,i)).toDF(cols:_*).withColumn("d", concat_ws("_", cols.map(col):_*))
val df1 = createDf
val df2 = createDf
df1 和 df2 看起来都一样:
+---+---+---+-----+
| a| b| c| d|
+---+---+---+-----+
| 1| 1| 1|1_1_1|
| 2| 2| 2|2_2_2|
| 3| 3| 3|3_3_3|
+---+---+---+-----+
当我们按列 d 对两个 DataFrame 进行分区并将此列用作连接条件时
df1.repartition(4, col("d")).join(df2.repartition(4, col("d")), "d").explain()
我们得到执行计划
== Physical Plan ==
*(3) Project [d#13, a#7, b#8, c#9, a#25, b#26, c#27]
+- *(3) SortMergeJoin [d#13], [d#31], Inner
:- *(1) Sort [d#13 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(d#13, 4)
: +- LocalTableScan [a#7, b#8, c#9, d#13]
+- *(2) Sort [d#31 ASC NULLS FIRST], false, 0
+- ReusedExchange [a#25, b#26, c#27, d#31], Exchange hashpartitioning(d#13, 4)
通过d 对两个DataFrame 进行分区,但加入a、b 和c
df1.repartition(4, col("d")).join(df2.repartition(4, col("d")), cols).explain()
导致执行计划
== Physical Plan ==
*(3) Project [a#7, b#8, c#9, d#13, d#31]
+- *(3) SortMergeJoin [a#7, b#8, c#9], [a#25, b#26, c#27], Inner
:- *(1) Sort [a#7 ASC NULLS FIRST, b#8 ASC NULLS FIRST, c#9 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(a#7, b#8, c#9, 200)
: +- Exchange hashpartitioning(d#13, 4)
: +- LocalTableScan [a#7, b#8, c#9, d#13]
+- *(2) Sort [a#25 ASC NULLS FIRST, b#26 ASC NULLS FIRST, c#27 ASC NULLS FIRST], false, 0
+- ReusedExchange [a#25, b#26, c#27, d#31], Exchange hashpartitioning(a#7, b#8, c#9, 200)
其中包含比第一个计划多一个Exchange hashpartitioning。在这种情况下,a、b、c 的连接会更慢。
另一方面,如果数据帧由a、b 和c 分区,则a、b、c 的连接将比d 的连接更快。