Mapreduce工作全流程

mapreduce工作三大核心问题

1.图示

Mapreduce工作全流程(流程详解)

  • 读数据到底是怎么回事?
  • shuffle到底是什么?
  • 结果数据到底是怎么回事?

mapreduce的工作流程

1.读数据

1.1对文件切片产生的问题的解决

  • 图示

Mapreduce工作全流程(流程详解)

  • 理解:当文件进行切片时,有可能会把单词且分开比如:hello切分为he和llo

    为了能够完整的,不出错的统计每一个单词的出现。有以下解决方案

  • 解决思想:因为默认的读取操作是,默认读取文件的一行数据,读取器不会管你是不是在这一行切开了。根据这个特性来进行框架规定了解决方案。

  • 解决方案(读取规则)

    • 每个maptask都向下多读一行
    • 每一个maptask都要抛弃读取的第一行数据
    • 第一个maptask不需要抛弃第一行
    • 最后一个maptask不往下多读一行

1.2图示

Mapreduce工作全流程(流程详解)

1.3读文件流程

  • maptask调用TextInputFormat.
    • TextInputFormat:指定去读什么文件,比如:去文本文档中,数据库等
  • TextInputFormat调用createLineRecordReader方法后,会生成一个Reader(具体怎么去读
  • 生成了一个LineRecordReader对象,按行读
    • nextkeyvalue
    • getCurrentkey
    • getCurrentvalue
  • maptask调用nextkeyvalue方法
  • maptask调用getCurrentkey,getCurrentvalue获取到k,v的键值对数据
  • 调用用户继承Mapper类重写map方法的类
  • 将kv键值对交给map方法
  • 由map对数据进行处理

2.shuffle

2.1图示

Mapreduce工作全流程(流程详解)

2.2shuffle流程

  • map处理完数据后不断写出

  • 此时outputCollectot,会收集map写出的数据,并放入环形缓冲区。

  • 环形缓冲区:默认是100M,分为两个区域数据区(80%)保留区(20%)

    • 首先,map写出的数据会被outputcollector收集进入数据区,渐渐的数据区存储饱和,此时开启溢写(见下文)

    • 注意点:

      • 数据区会有三个数据来记录索引:顺序,大小,偏移量等

      • 当80%的数据区被写满之后数据区会被锁定,此时outputcollector收集的数据会写入20%的保留区。当数据区的数据溢写完成后,锁定解除。

        此时的20%的保留区加上数据区的60%构成新的数据区,数据区剩余的20%为新的保留区

      • 保留区与数据区的比例是可以由用户决定的。

  • 溢写:当环形缓冲的数据区存满之后,会开启溢写生成溢出器spiller

    • 首先会对数据区的数据进行分区和排序

      • 分区(Partitioner)

      • MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。

        默认是按照HASHPARTITIONER来进行排序的。

        具体方法为getPartition(k,v,nums(reducetask的数量))

        具体实现:key%reducetask的数量,这样就能平均分配每个reducetask的处理的数据量

      • 排序(key.compareTo):调用key的compareTo方法进行比较,默认是快排,字典排序。

      • Combiner:是mapreduce的组件,目的:进行局部汇总。工作机制与reduce完全一致。由用户设置使不使用。(下文具体讲述)

    • 分区排序完成之后,数据区的数据会通过溢出器,溢出到本地临时文件。

    • 注意点:

      • 为什么会生成多个小文件:每当数据存满之后,会将整个数据区的分区排序好之后,写入到一个文件当中。多次累计,就得到了很多小文件。

      • 当最后map处理完所有数据后,数据区如果没有触发溢写怎么办?

        也会将内存中的数据分区排序后写入到磁盘中。

      • 每个文件都会有一个index索引文件(用于描述区的偏移量

  • merge(整合)

    • 将所有的溢出文件中,同一个区的所有数据合并到同一分区归并排序生成一个大文件。
    • 注意点:
      • 一个maptask最后会整合成一个大文件,大文件也会有索引
      • merge的过程中combiner也会起作用
  • reducetask拉取

    • reducetask会去每个maptask上拉取对应分区的文件,比如,rt-01会去拉取01分区的文件。
    • 拉取完毕后,会得到一堆小文件
    • 对所有的小文件,进行merge,归并排序成一个大文件
  • 到此为止shuffle结束

2.3注意点

  • 整个shuffle的过程,
    • 小文件如何生成的?
    • 分区,排序,combine是在缓存中进行的
      • 分区的规则是什么。
      • 排序的默认算法。
    • merge是按照什么规则进行的。
    • reducetask拉取到的小文件是什么

3.reduce的执行以及写出

3.1图示

Mapreduce工作全流程(流程详解)

3.2reduce的过程

  • 整合成的大文件,由groupingComparetor规定怎样分组,

    • groupingComparetor:负责判断key的分组。
  • 将key相同的一组的key,和reducetask的迭代器,上下文管理器context传递给reduce,开始工作。

  • 迭代器的工作原理

    • 注意:这里的迭代器是框架重写hasnext和next之后的迭代器

    • 这个迭代器,默认一个键值对是一个元素。

    • hasnext()重写后的功能:用于判断当前指向的元素的key值是否和正在reduce的key值是否相同。如果相同则可以指向下一个

    • next():重写后的功能。用于读取键值对的值

    • 工作:

      • 迭代器会对整个大文件进行遍历。
      • hasnext指向第一个,当下一个元素的key值与当前reduce的key不相等事,返回false,此时,本次的reduce的逻辑结束。
      • 开启下一次的reduce时,hasnext的指针停留于上次reduce结束的地方
    • 看起来的效果

      reducetask会事先把数据按照key进行分组,然后对每一组数据调用一次reduce方法

  • 将结果输出给TextOutputFormat(指向输出到哪里

  • lineRecordWriter,实现具体怎么写的对象

4.mapreduce运行全流程—概略图

Mapreduce工作全流程(流程详解)

  • 不多解释了,累死我了

mapreduce流程中可干预的流程

1.map端可干预

  • inputformat:可以规定从哪里读入数据,比如:txt,数据库等
  • recordreader:可以规定如何读数据,比如==:按行读,多行读等==
  • partitioner:分区,默认是hashPartitioner。规则见上文。
  • combiner组件
    • 用于局部汇总
    • 工作机制完全跟reduce组件一致
    • combiner组件可能会调用一次或者多次(merge时也可以调用)
    • combiner组件不能影响最终的结果
  • 排序:例如:缓存中的快排,merge的归并排序

2.reduce端可干预

  • outputformat:规定输出到哪里
  • recordWriter:规定怎么写出
  • GroupingComparator:用于reducetask端合并的大文件中,如何区分哪些key是一组的。

3.GroupingComparator的使用案例

  • 图示

Mapreduce工作全流程(流程详解)

  • 将键值对存入类中后,需要的处理。

    • 需要保证orderid号相同的数据全部发往同一个reducetask,需要实现自定义的Partitioner,分区逻辑:根据bean的orderid进行分区
    • 还需要保证数据按照订单和金额大小进行排序,所以需要改写javabean的compareTo方法,比较逻辑:先按照订单号排序,再按照金额倒序排列
    • 需要保证同订单的数据能够分成一组,需要改写groupingcomparator对象,分组比较逻辑:只要对象的订单号相同,就认为两个对象相等。
  • 自己的理解:当求最大值时,如果数据过多,我们在reduce中迭代的越多,效率会变低。如果能在整合的时候将顺序排好,这样效率会大大提高。

  • 首先定义了javabean去存储键值对,首先遇到了第一个问题,我们如何将key一样的分到一个分区内,因为这时候存储的是对象,并不会去比较对象内部的key。

    因此,此时用到了上文提出的可干预项Partitioner,这时候我们自己去定义getPartition方法,用我们自己定义的分区方法来实现分区,这样就可以人为的将我们想要在一起的数据,让他们在一个分区里了。

  • 其次数据在排序时的算法,他会默认去调用类的CompareTo方法,因此,我们只需要在CompareTo方法中定义我们自己的排序方法,比如订单和金额,我们定义订单相同的数据再比较金额,就能得到一组一组的订单。

  • 以上工作哦完成后,最后一个问题,如何让reduce知道或者让迭代器知道,我这两个对象是具有相同的key,这时候我们需要去自己定义groupingcomparator来实现,例如规定对象内的key属性相同的我们就认为两个对象相等。类似于没有重写equals之前,我们两个对象是不想等的,重写equals之后,我们可以用我们的规则去比较两个对象,让他们两个相等。

相关文章: