【问题标题】:Spark's Shuffle Sort Merge Join. One DataFrame is bucketed. Does Spark take advantage of this?Spark 的 Shuffle Sort Merge Join。一个 DataFrame 被分桶。 Spark 是否利用了这一点?
【发布时间】:2020-11-26 13:32:54
【问题描述】:

我记得在使用 RDD 时,如果一个键值 RDD (rdd1) 具有已知的分区,那么使用不同的未分区键值 RDD (rdd2) 执行连接将带来性能优势。这是因为 1) 只有 rdd2 的数据需要通过网络传输,2) 通过将 rdd1 的键的分区应用于rdd2的key

我正在学习使用 DataFrames 的 Shuffle Sort Merge Joins。我正在阅读的书(Learning Spark,第 2 版)中的示例是用于连接两个基于 user_id 列的 DataFrame。该示例试图演示从联接操作中消除 Exchange 阶段,因此,在联接之前,两个 DataFrame 都按要联接的列分桶到相同数量的桶中。

我的问题是,如果只有一个 DataFrame 被分桶,会发生什么?显然,交换阶段将重新出现。但是如果我们知道 DataFrame1 被我们想要加入的列分桶到 N 个桶中,Spark 是否会使用这个分桶信息在网络上有效地传输 DataFrame2 的行,就像在 RDD 的情况下一样? Spark 会将 DataFrame1 的行保留在原处,并且只对 DataFrame2 应用相同的分桶吗? (假设 N 个桶导致分区中有合理数量的数据被执行程序加入)或者,Spark 是否低效地洗牌两个 DataFrame?

特别是,我可以想象这样一种情况,即我有一个“主”数据帧,我需要针对该数据帧与同一列上的其他补充数据帧执行许多独立连接。当然,只需要预先存储主 DataFrame 才能看到所有连接的性能优势吗? (虽然我认为不厌其烦地存储补充 DataFrames 也不会受到伤害)

【问题讨论】:

  • 为什么不自己尝试一下 .explain?
  • @thebluephantom 因为我对单节点本地模式了解得不够多,无法知道答案是确定的(我没有集群可以尝试)
  • .explain 仍然会告诉你。不过你可以登录databricks社区版试试看。

标签: apache-spark


【解决方案1】:

https://kb.databricks.com/data/bucketing.html 这解释了这一切,并在我总结的原始帖子上进行了一些修饰。

底线:

val t1 = spark.table("unbucketed")
val t2 = spark.table("bucketed")
val t3 = spark.table("bucketed")

Unbucketed - 分桶连接。两边都需要重新分区。

t1.join(t2, Seq("key")).explain()

使用重新分区取消分桶 - 分桶连接。未分包的一面是 正确重新分区,只需要一次随机播放。

t1.repartition(16, $"key").join(t2, Seq("key")).explain()

使用不正确的重新分区取消分桶(默认(200) - 分桶连接。 未分桶的一侧被错误地重新分区,并且两次洗牌 需要。

t1.repartition($"key").join(t2, Seq("key")).explain()

bucketed - 分桶连接。理想情况,双方相同 分桶,并且不需要洗牌

t3.join(t2, Seq("key")).explain()

因此,双方都需要相同的分桶以获得最佳性能。

【讨论】:

  • v.好文章和总结。有趣的是,如果您通过 repartition() 提供正确数量的分区,您只能利用“其他”DataFrame 的分桶。遗憾的是,Spark 还不“知道”最优值是什么,但我相信这是有充分理由的。我绝对对“为什么”会这样感兴趣,尽管我不是在这里寻找答案。谢谢你:)
  • 明白你的意思,优化器越来越好,但远远落后于 oracle 等人。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-10-17
相关资源
最近更新 更多