一、HashShuffle
1.1 未优化的 HashShuffle
步骤:
- 每个 Mapper Task 都按照 Reducer Task 的数量 n 把不同的 key 对应的数据先写到 n 个 buffer 中,如果 buffer 达到阈值,就溢出到文件中;
- Reducer Task 抓取所有 Mapper Task 产生的文件。
缺点:
- 会产生大量小文件,如果有一共有 1000 个 mapper task,500 个 reducer task, 那么产生的小文件个数就是 500000,这会导致大量耗时低效的 IO 操作;
- 内存不够用,由于内存中需要保存海量小文件操作句柄和临时信息,如果数据处理的规模比较庞大的话,内存不可承受,会出现 OOM 等问题。
1.2 优化过的 HashShuffle
步骤:
- 每个 Executor 按照 Reducer Task 的数量 n 把不同的 key 对应的数据先写到 n 个缓存中,如果缓存达到阈值,就溢出到文件中;
- Reducer Task 抓取所有 Executor 产生的文件。
缺点:还是会产生大量的小文件
二、SortedShuffle
1.1 普通的 SortedShuffle
步骤:
- 每个 MapTask 都会把数据先写到缓冲区中,在溢出的时候按照 partitionID 和 partition 内的数据进行排序,然后将排序好的数据写到磁盘,生成一个 data 文件;
- 每个 data 文件都对应对一个 index 文件,reducer 解析 index 文件来读取自己在 data 文件中对应的内容,这样一来,文件规模就是 2 * mapTask。
缺点:
- 如果 Mapper 中的 Task 的数量过大,依旧回产生大量小文件,此时在 Shuffle 传数据的过程中到 Reducer 端,Reducer 会大量的记录来反序列化(就是从 index 到 data 的映射过程),导致大量的内存消耗;
- 基于记录本身进行排序,导致性能消耗。
1.2 byPass 机制的 SortedShuffle
它将数据记录用二进制的方式存储,直接在序列化的二进制数据上 sort 而不是在 java 对象上,这样一方面可以减少 memory 的使用和 GC 的开销,另一方面避免 shuffle 过程中频繁的序列化以及反序列化。在排序过程中,使用一个 8 bytes 的指针,将排序转成一个指针数组的排序,极大的优化了性能。
在一定条件下不需要对记录本身进行排序,条件:
- 没有 aggregate 操作,因为 aggregate 操作就是要在 map 阶段把所有 key 相同的数据放到一起;
- Mapper 端的 partition 大于 2^24 - 1 = 16,777,215,这是可编码的最大 partition id。
三、几个常见问题
-
什么时候开始 fetch 数据
当 parent stage 的所有 shuffleMapTasks 结束后再 fetch(这里和 MR 不同)。理论上讲,一个 shuffleMapTask 结束后就可以 fetch,但是为了迎合 stage 的概念(即一个 stage 的 parent stages 没有执行完,自己是不能被提交的)
-
边 fetch 边处理还是一次性 fetch 完再处理
因为 spark 不要求全局有序,所以是边 fetch 边处理的(MR 是边 fetch 边 combine,只是 combine 处理的是部分数据,为了让进入 reducer 的数据有序,必须等到所有数据都 shuffle-sort 之后再 reduce。)
-
fetch 来的数据存放到哪里