文章目录
SparkShuffle概念
reduceByKey会将一个RDD中的每一个key对应的所有value聚合成一个value,然后生成一个新的RDD,元素类型是<key,value>对的形式,这样每一个key对应一个聚合的value
问题:
聚合之前,每一个 key 对应的 value 不一定都是在一个 partition中,也不太可能在同一个节点上,因为 RDD 是分布式的弹性的数据集,RDD 的 partition 极有可能分布在各个节点上
如何聚合
Shuffle Write :
上一个 stage 的每个 map task 就必须保证将自己处理的当前分区的数据相同的 key 写入一个分区文件中,可能会写入多个不同的分区文件中。
Shuffle Read :
reduce task 就会从上一个 stage 的所有 task 所在的机器上寻找属于己的那些分区文件,这样就可以保证每一个 key 所对应的 value 都会汇聚到同一个节点上去处理和聚合。
HashShuffle
默认的分区器是 HashPartitioner
普通机制
执行流程
a.每一个map task 将不同的结果写到不同的buffer中,每个buffer大小为32k,buffer起到数据缓冲的作用
b.每个buffer文件最后对应一个磁盘小文件
c.reduce task来拉取对应的磁盘小文件
总结
① map task的计算结果会根据分区器(默认hihashPartitioner)来决定写入到哪个磁盘小文件中去,ReduceTask会去Map端拉取相应的磁盘小文件
②产生的磁盘小文件个数 : M*R
存在问题
a.在 Shuffle Write 过程中会产生很多写磁盘小文件的对象
b.在 Shuffle Read 过程中会产生很多读取磁盘小文件的对象。
c.在JVM堆内存中对象过多会造成频繁的gc,gc还无法解决运行所需要的内存 的话,就会 OOM。
d.数据传输过程中会有频繁的网络通信,频繁的网络通信出现通信故障的可能性大大增加,一旦网络通信出现了故障会导致 shuffle file cannot find 由于这个错误导致的 task 失败,TaskScheduler 不负责重试,由 DAGScheduler 负责重试 Stage。
合并机制
总结
产生磁盘小文件的个数:C(core 的个数)*R(reduce 的个数)
SortShuffle
普通机制
执行流程
a. map task的计算结果会写入到一个内存数据结构里,内存数据结构默认是5M
b.在shuffle的时候会有一个定时器,不定期的去估算这个内存结构的大小,当内存结构中的数据超过5M,结构中的数据为 5.01M,那么他会申请 5.01*2-5=5.02M 内存给内存数据结构。
c.如果申请成功不会进行溢写,如果申请不成功,这时候会发生溢写磁盘。
d.溢写之前的内存结构中的是数据会进行排序分区
e.然后开始溢写磁盘,写磁盘是以batch形式去写,一个batch是1w条数据
f.map task执行完成后,将磁盘中小文件合并成一个大的磁盘文件和一个索引文件
g.reduce task去map端拉取数据的时候,首先解析索引文件,然后根据索引文件再去拉取对应的数据
总结
产生的磁盘小文件个数 : 2* M
bypass 机制
总结
shuffle的 reduce task 数量小于 spark.shuffle.sort.bypassMergeThreshold 默认200
产生的磁盘小文件为:2*M(map task 的个数)