要合并来自 2 个或更多 spark Dataframes 的记录,连接是必要的。
如果您的数据没有很好地分区/分桶,则会导致Shuffle join。其中每个节点都与每个其他节点通信,并且它们根据哪个节点具有某个密钥或一组密钥(您正在加入)共享数据。这些连接很昂贵,因为网络可能会被流量拥塞。
如果满足以下条件,则可以避免洗牌:
- 两个数据帧都有一个已知的分区器或分桶。
- 其中一个数据集小到可以放入内存,在这种情况下,我们可以进行广播哈希连接
分区
如果您在连接之前对数据进行了正确的分区,那么最终的执行效率会大大提高,因为即使计划了 shuffle,如果来自两个不同 DataFrame 的数据已经位于同一台机器上,Spark 也可以避免 shuffle .
df1.repartition(col("id"))
df2.repartition(col("id"))
// you can optionally specify the number of partitions like:
df1.repartition(10, col("id"))
// Join Dataframes on id column
df1.join(df2, "id") // this will avoid the duplicate id columns in output DF.
广播哈希加入
当其中一个数据集小到足以放入单个工作节点的内存时,我们可以优化我们的连接。
Spark 会将小型 DataFrame 复制到集群中的每个工作节点(无论它位于一台机器上还是多台机器上)。现在这听起来很昂贵。但是,这样做会阻止我们在整个加入过程中执行全对全通信。相反,它在开始时只执行一次,然后让每个单独的工作节点执行工作,而无需等待或与任何其他工作节点通信。
import org.apache.spark.sql.functions.broadcast
// explicitly specify the broadcast hint, though spark handles it.
df1.join(broadcast(df2), "id")