shuffle

MR确保每个reducer的输入都是按key排序的,系统执行排序,将map输入传给reduce的过程叫shuffle

MapReduce运行机制(三) shuffle和sort和shuffle调优

map端

1) map输出:不是直接写到磁盘,而是利用缓冲写到内存并进行预排序

2) shuffle

    > 每个map任务有环形内存缓冲区用于存储任务输出,到达80%时,后台线程开始将缓冲区内容溢出到磁盘,边溢出边写入,如果缓冲区写满,map会阻塞直到spill过程完成。溢出按照轮询方式写入

       mapreduce.task.io.sort.mb缓冲区大小,默认为100MB

       mapreduce.map.sort.spill.percent溢出阈值,默认0.80

       mapreduce.cluster.local.dir 溢出文件目录

    > 写磁盘之前,线程根据reducer划分partition,每个分区按键内存排序

    > 如果有combiner函数,则在partition输出运行,使map输出结果更紧凑

    > 缓冲区到阈值,创建spill file,map任务写完最后输出时,会有几个溢出文件,任务完成之前,合并为一个分区排序好的文件

       mapreduce.task.io.sort.factor控制最多合并多少流,默认10

    > 至少存在3个spill files, combiner会在输入上反复运行,且不影响输出结果

       mapreduce.map.combin.minspills设置至少几个spill files才combine

    > 将压缩后的map压缩写到磁盘,默认输出不压缩

       mapreduce.map.output.compress设置为true

       mapreduce.map.output.compress.codec指定压缩方式

    > reducer通过HTTP获取文件分区

       mapreduce.shuffle.max.threads 文件分区的工作线程,针对NM,不是task,默认0最大线程数时处理器数的2倍


reduce端

1) 复制阶段

    > map输出文件位于运行map任务的tasktracker的本地磁盘,在map任务完成时,reduce任务就开始复制其输出

       reduce任务有少量复制线程,并行取得map输出

       mapreduce.reduce.shuffle.parallelcopies 复制线程数目,默认5

    > 如果map输出很小,会被复制到reduce任务JVM内存,如果达到阈值,或达到map输出阈值,则合并后溢出到磁盘。

       mapreduce.reduce.shuffle.input.buffer.percent JVM缓冲区百分比

       mapreduce.reduce.shuffle.merge.percent    缓冲区阈值

        mapreduce.reduce.merge,inmem.thrshold    map输出阈值

2) 合并阶段(merge/排序阶段)

    合并map输出,维持顺序排序,循环进行,合并可以是内存和磁盘混合

       mapreduce.task.io.sort.factor,默认时10

3) reduce阶段

    数据写入reduce函数,输出中的每个键调用reduce函数,输出直接写到输出文件系统,如HDFS


shuffle调优

1) map调优

原则:

    > 尽量多的提供内存空间给shuffle的同时,确保map函数和reduce函数有足够内存运行

    > 避免多次溢出磁盘

2) reduce调优

原则:复制的map输出全部驻留在内存中,则性能最佳

相关文章: