Mapreduce工作全流程
mapreduce工作三大核心问题
1.图示
- 读数据到底是怎么回事?
- shuffle到底是什么?
- 结果数据到底是怎么回事?
mapreduce的工作流程
1.读数据
1.1对文件切片产生的问题的解决
- 图示
-
理解:当文件进行切片时,有可能会把单词且分开比如:hello切分为he和llo
为了能够完整的,不出错的统计每一个单词的出现。有以下解决方案
-
解决思想:因为默认的读取操作是,默认读取文件的一行数据,读取器不会管你是不是在这一行切开了。根据这个特性来进行框架规定了解决方案。
-
解决方案(读取规则):
- 每个maptask都向下多读一行
- 每一个maptask都要抛弃读取的第一行数据
- 第一个maptask不需要抛弃第一行
- 最后一个maptask不往下多读一行
1.2图示
1.3读文件流程
- maptask调用TextInputFormat.
- TextInputFormat:指定去读什么文件,比如:去文本文档中,数据库等
- TextInputFormat调用createLineRecordReader方法后,会生成一个Reader(具体怎么去读)
-
生成了一个LineRecordReader对象,按行读
- nextkeyvalue
- getCurrentkey
- getCurrentvalue
- maptask调用nextkeyvalue方法
- maptask调用getCurrentkey,getCurrentvalue获取到k,v的键值对数据
- 调用用户继承Mapper类重写map方法的类
- 将kv键值对交给map方法
- 由map对数据进行处理
2.shuffle
2.1图示
2.2shuffle流程
-
map处理完数据后不断写出
-
此时outputCollectot,会收集map写出的数据,并放入环形缓冲区。
-
环形缓冲区:默认是100M,分为两个区域数据区(80%),保留区(20%)
-
首先,map写出的数据会被outputcollector收集进入数据区,渐渐的数据区存储饱和,此时开启溢写(见下文)。
-
注意点:
-
数据区会有三个数据来记录索引:顺序,大小,偏移量等
-
当80%的数据区被写满之后,数据区会被锁定,此时outputcollector收集的数据会写入20%的保留区。当数据区的数据溢写完成后,锁定解除。
此时的20%的保留区加上数据区的60%构成新的数据区,数据区剩余的20%为新的保留区
-
保留区与数据区的比例是可以由用户决定的。
-
-
-
溢写:当环形缓冲的数据区存满之后,会开启溢写,生成溢出器spiller。
-
首先会对数据区的数据进行分区和排序
-
分区(Partitioner)
-
MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。
默认是按照HASHPARTITIONER来进行排序的。
具体方法为getPartition(k,v,nums(reducetask的数量))
具体实现:key%reducetask的数量,这样就能平均分配每个reducetask的处理的数据量
-
排序(key.compareTo):调用key的compareTo方法进行比较,默认是快排,字典排序。
-
Combiner:是mapreduce的组件,目的:进行局部汇总。工作机制与reduce完全一致。由用户设置使不使用。(下文具体讲述)
-
-
分区排序完成之后,数据区的数据会通过溢出器,溢出到本地临时文件。
-
注意点:
-
为什么会生成多个小文件:每当数据存满之后,会将整个数据区的分区排序好之后,写入到一个文件当中。多次累计,就得到了很多小文件。
-
当最后map处理完所有数据后,数据区如果没有触发溢写怎么办?
也会将内存中的数据分区排序后写入到磁盘中。
-
每个文件都会有一个index索引文件(用于描述区的偏移量)
-
-
-
merge(整合)
- 将所有的溢出文件中,同一个区的所有数据合并到同一分区,归并排序生成一个大文件。
- 注意点:
- 一个maptask最后会整合成一个大文件,大文件也会有索引
- merge的过程中combiner也会起作用
-
reducetask拉取
- reducetask会去每个maptask上拉取对应分区的文件,比如,rt-01会去拉取01分区的文件。
- 拉取完毕后,会得到一堆小文件。
- 对所有的小文件,进行merge,归并排序成一个大文件
-
到此为止shuffle结束
2.3注意点
- 整个shuffle的过程,
- 小文件如何生成的?
- 分区,排序,combine是在缓存中进行的
- 分区的规则是什么。
- 排序的默认算法。
- merge是按照什么规则进行的。
- reducetask拉取到的小文件是什么?
3.reduce的执行以及写出
3.1图示
3.2reduce的过程
-
整合成的大文件,由groupingComparetor规定怎样分组,
- groupingComparetor:负责判断key的分组。
-
将key相同的一组的key,和reducetask的迭代器,上下文管理器context传递给reduce,开始工作。
-
迭代器的工作原理
-
注意:这里的迭代器是框架重写hasnext和next之后的迭代器。
-
这个迭代器,默认一个键值对是一个元素。
-
hasnext()重写后的功能:用于判断当前指向的元素的key值是否和正在reduce的key值是否相同。如果相同则可以指向下一个。
-
next():重写后的功能。用于读取键值对的值
-
工作:
- 迭代器会对整个大文件进行遍历。
- hasnext指向第一个,当下一个元素的key值与当前reduce的key不相等事,返回false,此时,本次的reduce的逻辑结束。
- 开启下一次的reduce时,hasnext的指针停留于上次reduce结束的地方
-
看起来的效果
reducetask会事先把数据按照key进行分组,然后对每一组数据调用一次reduce方法
-
-
将结果输出给TextOutputFormat(指向输出到哪里)
-
lineRecordWriter,实现具体怎么写的对象
4.mapreduce运行全流程—概略图
- 不多解释了,累死我了。
mapreduce流程中可干预的流程
1.map端可干预
- inputformat:可以规定从哪里读入数据,比如:txt,数据库等
- recordreader:可以规定如何读数据,比如==:按行读,多行读等==
- partitioner:分区,默认是hashPartitioner。规则见上文。
-
combiner组件
- 用于局部汇总
- 工作机制完全跟reduce组件一致
- combiner组件可能会调用一次或者多次(merge时也可以调用)
- combiner组件不能影响最终的结果
- 排序:例如:缓存中的快排,merge的归并排序
2.reduce端可干预
- outputformat:规定输出到哪里
- recordWriter:规定怎么写出
- GroupingComparator:用于reducetask端合并的大文件中,如何区分哪些key是一组的。
3.GroupingComparator的使用案例
- 图示
-
将键值对存入类中后,需要的处理。
- 需要保证orderid号相同的数据全部发往同一个reducetask,需要实现自定义的Partitioner,分区逻辑:根据bean的orderid进行分区
- 还需要保证数据按照订单和金额大小进行排序,所以需要改写javabean的compareTo方法,比较逻辑:先按照订单号排序,再按照金额倒序排列
- 需要保证同订单的数据能够分成一组,需要改写groupingcomparator对象,分组比较逻辑:只要对象的订单号相同,就认为两个对象相等。
-
自己的理解:当求最大值时,如果数据过多,我们在reduce中迭代的越多,效率会变低。如果能在整合的时候将顺序排好,这样效率会大大提高。
-
首先定义了javabean去存储键值对,首先遇到了第一个问题,我们如何将key一样的分到一个分区内,因为这时候存储的是对象,并不会去比较对象内部的key。
因此,此时用到了上文提出的可干预项Partitioner,这时候我们自己去定义getPartition方法,用我们自己定义的分区方法来实现分区,这样就可以人为的将我们想要在一起的数据,让他们在一个分区里了。
-
其次数据在排序时的算法,他会默认去调用类的CompareTo方法,因此,我们只需要在CompareTo方法中定义我们自己的排序方法,比如订单和金额,我们定义订单相同的数据再比较金额,就能得到一组一组的订单。
-
以上工作哦完成后,最后一个问题,如何让reduce知道或者让迭代器知道,我这两个对象是具有相同的key,这时候我们需要去自己定义groupingcomparator来实现,例如规定对象内的key属性相同的我们就认为两个对象相等。类似于没有重写equals之前,我们两个对象是不想等的,重写equals之后,我们可以用我们的规则去比较两个对象,让他们两个相等。