shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过shuffle这个环节,shuffle的性能高低直接影响了整个程序的性能和吞吐量。

shuffle的目的是以下三点:

  • 完整地从map task端读取数据到reduce 端。
  • 在跨节点读取数据时,尽可能地减少对带宽的不必要消耗。
  • 减少磁盘IO对task执行的影响。

map reduce原理

在进入map这一步之前,首先是split(分片),按输入文件大小和数量切分多个片,每一个片对应一个maptask。

 

MR过程中共涉及三次排序,每一步如下

InputFormat

默认一般使用FileInputFormat,HDFS文件做为输入,一行行处理(默认是TextOutputFormat)

是一个接口,可以有多种实现方式,如:数据库、HTTP、FTP等

需要实现Split、RecordReader等接口。InputFormat实现了split的机制,此时也决定了map的数量

Split切片

map task的并发量由切片的数量决定,有多少切片就会有多少map task

切片是逻辑概念,指的是文件中数据的偏移量范围

切片的具体大小应该是根据要处理的文件大小来调整的

MAP阶段

缓存、分组(partition)、排序、combiner、合并溢写(需要排序)

在读取数据过程中会首先写到对应的环形内存缓冲区,当达到阀值(默认写到缓冲区80%)时会溢写磁盘

io.sort.spill.percent可以设置阀值

mapred.local.dir溢写路径,设置多路径时有助于利用磁盘i/o

分组partition

它分割map每个节点的结果,按照key分别映射给不同的reduce。也可以理解为根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。partition的作用就是把这些数据归类。每个map任务会针对输出进行分区,及对每一个reduce任务建立一个分区。划分分区由用户定义的partition函数控制,默认使用哈希函数来划分分区。

(1)计算(key,value)所属与的分区

当map输出的时候,写入缓存之前,会调用partition函数,计算出数据所属的分区,并且把这个元数据存储起来。

(2)把属与同一分区的数据合并在一起

当数据达到溢出的条件时(即达到溢出比例,启动线程准备写入文件前),读取缓存中的数据和分区元数据,然后把属与同一分区的数据合并到一起。

combiner

为了减少map到reduce间传输的数据量,提高传输效率,可以使用此方式进行优化,在map端把同一个key的键值对合并在一起并计算,计算规则同reduce,故等同于reduce过程。

当map输出数据根据分区排序完成后,写入文件前会执行一次combine操作

如果map输出比较大,溢出文件个数大于3(此值可以通过属性min.num.spills.for.combine配置)时,在merge的过程(多个spill文件合并为一个大文件)中前还会执行combiner操作

注意
combine只用于reduce输入k/v和输出k/v类型完成相同情况
经过combine不会影响最终计算结果,如求和、最大值,但平均值会受影响

copy、排序、合并

经http文件传输,再次合并排序,做为reduce的输入

reduce阶段

OutputFormat

默认使用FileOutputFormat,HDFS文件做为输出,是一个接口

 

 

相关文章: