SparkShuffle概念

reduceByKey会将一个RDD中的每一个key对应的所有value聚合成一个value,然后生成一个新的RDD,元素类型是<key,value>对的形式,这样每一个key对应一个聚合的value

问题:

聚合之前,每一个 key 对应的 value 不一定都是在一个 partition中,也不太可能在同一个节点上,因为 RDD 是分布式的弹性的数据集,RDD 的 partition 极有可能分布在各个节点上

如何聚合

Shuffle Write :

上一个 stage 的每个 map task 就必须保证将自己处理的当前分区的数据相同的 key 写入一个分区文件中,可能会写入多个不同的分区文件中。

Shuffle Read :

reduce task 就会从上一个 stage 的所有 task 所在的机器上寻找属于己的那些分区文件,这样就可以保证每一个 key 所对应的 value 都会汇聚到同一个节点上去处理和聚合。

HashShuffle

默认的分区器是 HashPartitioner

普通机制

SparkShuffle的基本机制

执行流程

a.每一个map task 将不同的结果写到不同的buffer中,每个buffer大小为32k,buffer起到数据缓冲的作用

b.每个buffer文件最后对应一个磁盘小文件

c.reduce task来拉取对应的磁盘小文件

总结

① map task的计算结果会根据分区器(默认hihashPartitioner)来决定写入到哪个磁盘小文件中去,ReduceTask会去Map端拉取相应的磁盘小文件

②产生的磁盘小文件个数 : M*R

存在问题

a.在 Shuffle Write 过程中会产生很多写磁盘小文件的对象

b.在 Shuffle Read 过程中会产生很多读取磁盘小文件的对象。

c.在JVM堆内存中对象过多会造成频繁的gc,gc还无法解决运行所需要的内存 的话,就会 OOM。

d.数据传输过程中会有频繁的网络通信,频繁的网络通信出现通信故障的可能性大大增加,一旦网络通信出现了故障会导致 shuffle file cannot find 由于这个错误导致的 task 失败,TaskScheduler 不负责重试,由 DAGScheduler 负责重试 Stage。

合并机制

SparkShuffle的基本机制

总结

产生磁盘小文件的个数:C(core 的个数)*R(reduce 的个数)

SortShuffle

普通机制

SparkShuffle的基本机制

执行流程

a. map task的计算结果会写入到一个内存数据结构里,内存数据结构默认是5M

b.在shuffle的时候会有一个定时器,不定期的去估算这个内存结构的大小,当内存结构中的数据超过5M,结构中的数据为 5.01M,那么他会申请 5.01*2-5=5.02M 内存给内存数据结构。

c.如果申请成功不会进行溢写,如果申请不成功,这时候会发生溢写磁盘。

d.溢写之前的内存结构中的是数据会进行排序分区

e.然后开始溢写磁盘,写磁盘是以batch形式去写,一个batch是1w条数据

f.map task执行完成后,将磁盘中小文件合并成一个大的磁盘文件和一个索引文件

g.reduce task去map端拉取数据的时候,首先解析索引文件,然后根据索引文件再去拉取对应的数据

总结

产生的磁盘小文件个数 : 2* M

bypass 机制

SparkShuffle的基本机制

总结

shuffle的 reduce task 数量小于 spark.shuffle.sort.bypassMergeThreshold 默认200

产生的磁盘小文件为:2*M(map task 的个数)

相关文章:

  • 2021-11-27
  • 2022-12-23
  • 2021-11-06
  • 2022-12-23
  • 2022-12-23
  • 2021-11-30
  • 2021-09-10
  • 2021-09-11
猜你喜欢
  • 2022-01-01
  • 2021-05-25
  • 2021-11-26
  • 2021-06-27
  • 2021-12-27
  • 2022-02-17
相关资源
相似解决方案