一、HashShuffle

1.1 未优化的 HashShuffle

spark shuffle

步骤

  1. 每个 Mapper Task 都按照 Reducer Task 的数量 n 把不同的 key 对应的数据先写到 n 个 buffer 中,如果 buffer 达到阈值,就溢出到文件中;
  2. Reducer Task 抓取所有 Mapper Task 产生的文件。

缺点

  1. 会产生大量小文件,如果有一共有 1000 个 mapper task,500 个 reducer task, 那么产生的小文件个数就是 500000,这会导致大量耗时低效的 IO 操作;
  2. 内存不够用,由于内存中需要保存海量小文件操作句柄和临时信息,如果数据处理的规模比较庞大的话,内存不可承受,会出现 OOM 等问题。

1.2 优化过的 HashShuffle

spark shuffle

步骤

  1. 每个 Executor 按照 Reducer Task 的数量 n 把不同的 key 对应的数据先写到 n 个缓存中,如果缓存达到阈值,就溢出到文件中;
  2. Reducer Task 抓取所有 Executor 产生的文件。

缺点:还是会产生大量的小文件

二、SortedShuffle

1.1 普通的 SortedShuffle

spark shuffle
步骤

  1. 每个 MapTask 都会把数据先写到缓冲区中,在溢出的时候按照 partitionID 和 partition 内的数据进行排序,然后将排序好的数据写到磁盘,生成一个 data 文件;
  2. 每个 data 文件都对应对一个 index 文件,reducer 解析 index 文件来读取自己在 data 文件中对应的内容,这样一来,文件规模就是 2 * mapTask。

缺点

  1. 如果 Mapper 中的 Task 的数量过大,依旧回产生大量小文件,此时在 Shuffle 传数据的过程中到 Reducer 端,Reducer 会大量的记录来反序列化(就是从 index 到 data 的映射过程),导致大量的内存消耗;
  2. 基于记录本身进行排序,导致性能消耗。

1.2 byPass 机制的 SortedShuffle

​ 它将数据记录用二进制的方式存储,直接在序列化的二进制数据上 sort 而不是在 java 对象上,这样一方面可以减少 memory 的使用和 GC 的开销,另一方面避免 shuffle 过程中频繁的序列化以及反序列化。在排序过程中,使用一个 8 bytes 的指针,将排序转成一个指针数组的排序,极大的优化了性能。

在一定条件下不需要对记录本身进行排序,条件:

  1. 没有 aggregate 操作,因为 aggregate 操作就是要在 map 阶段把所有 key 相同的数据放到一起;
  2. Mapper 端的 partition 大于 2^24 - 1 = 16,777,215,这是可编码的最大 partition id。

三、几个常见问题

  • 什么时候开始 fetch 数据

    当 parent stage 的所有 shuffleMapTasks 结束后再 fetch(这里和 MR 不同)。理论上讲,一个 shuffleMapTask 结束后就可以 fetch,但是为了迎合 stage 的概念(即一个 stage 的 parent stages 没有执行完,自己是不能被提交的)

  • 边 fetch 边处理还是一次性 fetch 完再处理

    因为 spark 不要求全局有序,所以是边 fetch 边处理的(MR 是边 fetch 边 combine,只是 combine 处理的是部分数据,为了让进入 reducer 的数据有序,必须等到所有数据都 shuffle-sort 之后再 reduce。)

  • fetch 来的数据存放到哪里

相关文章: