shuffle
MR确保每个reducer的输入都是按key排序的,系统执行排序,将map输入传给reduce的过程叫shuffle
map端
1) map输出:不是直接写到磁盘,而是利用缓冲写到内存并进行预排序
2) shuffle
> 每个map任务有环形内存缓冲区用于存储任务输出,到达80%时,后台线程开始将缓冲区内容溢出到磁盘,边溢出边写入,如果缓冲区写满,map会阻塞直到spill过程完成。溢出按照轮询方式写入
mapreduce.task.io.sort.mb缓冲区大小,默认为100MB
mapreduce.map.sort.spill.percent溢出阈值,默认0.80
mapreduce.cluster.local.dir 溢出文件目录
> 写磁盘之前,线程根据reducer划分partition,每个分区按键内存排序
> 如果有combiner函数,则在partition输出运行,使map输出结果更紧凑
> 缓冲区到阈值,创建spill file,map任务写完最后输出时,会有几个溢出文件,任务完成之前,合并为一个分区排序好的文件
mapreduce.task.io.sort.factor控制最多合并多少流,默认10
> 至少存在3个spill files, combiner会在输入上反复运行,且不影响输出结果
mapreduce.map.combin.minspills设置至少几个spill files才combine
> 将压缩后的map压缩写到磁盘,默认输出不压缩
mapreduce.map.output.compress设置为true
mapreduce.map.output.compress.codec指定压缩方式
> reducer通过HTTP获取文件分区
mapreduce.shuffle.max.threads 文件分区的工作线程,针对NM,不是task,默认0最大线程数时处理器数的2倍
reduce端
1) 复制阶段
> map输出文件位于运行map任务的tasktracker的本地磁盘,在map任务完成时,reduce任务就开始复制其输出
reduce任务有少量复制线程,并行取得map输出
mapreduce.reduce.shuffle.parallelcopies 复制线程数目,默认5
> 如果map输出很小,会被复制到reduce任务JVM内存,如果达到阈值,或达到map输出阈值,则合并后溢出到磁盘。
mapreduce.reduce.shuffle.input.buffer.percent JVM缓冲区百分比
mapreduce.reduce.shuffle.merge.percent 缓冲区阈值
mapreduce.reduce.merge,inmem.thrshold map输出阈值
2) 合并阶段(merge/排序阶段)
合并map输出,维持顺序排序,循环进行,合并可以是内存和磁盘混合
mapreduce.task.io.sort.factor,默认时10
3) reduce阶段
数据写入reduce函数,输出中的每个键调用reduce函数,输出直接写到输出文件系统,如HDFS
shuffle调优
1) map调优
原则:
> 尽量多的提供内存空间给shuffle的同时,确保map函数和reduce函数有足够内存运行
> 避免多次溢出磁盘
2) reduce调优
原则:复制的map输出全部驻留在内存中,则性能最佳