【问题标题】:Why broadcast join collect data to driver in order to shuffle data?为什么广播连接收集数据给驱动程序以打乱数据?
【发布时间】:2021-08-12 18:45:59
【问题描述】:
我了解广播优化的概念。
当连接中的一方有小数据时,最好只为小的一方进行洗牌。
但是为什么不能只使用执行器来进行洗牌呢?为什么我们需要使用驱动程序?
如果每个执行者都持有哈希表来映射执行者之间的记录,我认为它应该可以工作。
在当前的 spark 广播实现中 - 它收集数据给驱动程序,然后对其进行随机播放,对驱动程序的收集操作是我想避免的瓶颈。
关于如何在没有驱动程序内存瓶颈的情况下实现类似优化的任何想法?
【问题讨论】:
标签:
apache-spark
join
pyspark
apache-spark-sql
【解决方案1】:
您是对的,当前的实现需要先将数据收集到驱动程序,然后再将其发送给执行器。
已经有一张 JIRA 票 SPARK-17556 准确地解决了您的提议:
“目前在 Spark SQL 中,为了执行广播连接,驱动程序必须收集 RDD 的结果然后广播它。这会引入一些额外的延迟。可能可以直接从执行程序广播。”
我从attached document 复制了建议的解决方案,以使这个答案具有自我描述性:
"要在RDD中添加一个广播方法来从执行器执行广播,我们需要一些支持工作如下:
- 从驱动程序构造 BroadCastId,BroadCastManager 将提供执行此操作的方法。
// Called from driver to create new broadcast id
def newBroadcastId: Long = nextBroadcastId.getAndIncrement()
-
BroadCastManager可以创建一个指定id和一个persist标签的广播来推断这个广播是执行者广播,它的数据会备份到hdfs上。
-
在 TorrentBroadcast.writeBlocks 中将 block 写入 hdfs,readBlocks 从 local、remote、hdfs 中读取 block 的优先级。
-
在构建Broadcast时,可以控制是否上传广播数据块
-
BroadCastManager发布一个将广播数据放到区块管理器的api