shuffle机制


将map输出作为输入传递给reducer的过程称为shuffle。

Shuffle过程包含在Map和Reduce两端

  map阶段大致过程为
    写数据,分区,排序,将属于同一分区的输出合并一起写在磁盘上。

  每个map任务都有一个环形内存缓冲区用于存储任务输出。环形内存缓冲区默认大小为100M。

  map开始产生输出数据时,先将数据写入缓冲区中,当缓冲区中数据达到阈值(默认为0.8)时,就开始把数据溢出到本地磁盘,溢出的文件成为spill文件。溢出的过程中,map输出的数据会继续写入缓冲区,如果此时缓冲区已经满了,那么map会等待缓冲区有空间时再进行写入。

  在将缓冲区的数据写入到磁盘之前,首先会根据数据最终要传的reducer对缓冲区的数据进行分区,在每个分区中,数据按key进行排序(快速排序)。如果有combiner函数,则会在排序后的输出上运行,使得map的输出结果更加紧凑。

  把所有溢出的文件进行一次合并,合并成一个已分区且已排序的输出文件。

  reduce阶段大致过程为
    复制,合并。

  reducer通过HTTP得到输出文件的分区。

  reduce有少量的复制线程(默认是5个),可以并行的从map上复制数据。

  Reduce可能需要从多个map任务中获取数据,因此只要多个map中的一个完成,reduce便可以从map复制数据。如果map的输出数据较小,会直接复制到内存;否则,map输出被复制到磁盘。当内存缓冲区达到阈值或达到map输出阈值时,合并后溢出写出到磁盘。如果指定combiner,那么会在合并期间运行它以降低写入磁盘的数据量。

  最后会排序合并这些溢出文件。

简单总结:

  先把数据写入到缓冲区,每次写入之前要计算分区号,当要溢写时,会对数据进行排序,根据key进行排序(可以自己定义按什么来排序),溢写时如果设置了combiner(适合加减场景),会先对溢写的数据进行合并,每次溢写会生成spill文件,将多个spill文件进行合并,合并成总的文件,合并时如果片段数量大于等于3,这个时候combiner会继续运行,如果不大于则不会运行。在maptask输出时,还可以指定将合并后的数据以压缩格式输出。最终将文件写入磁盘。

  Reduce阶段,通过shuffle线程拷贝指定分区的数据,拷贝后的数据会先在shuffle线程内存中进行合并,如果内存不够,也会进行多次溢写,最终对所有的数据进行一次归并排序。

=======================================================================

  1)环形缓冲区:

    排序方式:快排+字典序

    默认溢写阈值:80%

    默认大小:100M

    合理的调节环形缓冲区大小以及溢写阈值是一种常见的 MR 优化手段

  2)切片机制

    a) 简单地按照文件的内容长度进行切片

    b) 切片大小,默认等于 Block 大小

    c) 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片

    切片大小公式:max(0,min(Long_max,blockSize))

=====================================================================

一、分区

  1. 分区是在MapTask中通过Partitioner来计算分区号

  2. Partitioner的初始化

    ①计算总的分区数partitions,取决于用户设置的reduceTask的数量

    ②partitions>1,默认尝试获取用户设置Partitioner,如果用户没有定义,那么会使用HashPartitioner

    HashPartitioner根据key的hashcode进行计算,相同的key以及hash值相同的key会分到一个区

    ②partitions<=1,默认初始化一个Partitioner,这个Partitioner计算的所有的区号都为0

  3. 注意

    通常在Job的设置中,希望将数据分为几个区,就设置reduceTask的数量为对应的数量!

    partitions=设置的reduceTask的数量,0<=分区器计算的区号 < partitions

二、排序

  1. 排序是MR框架在shuffle阶段自动进行

  2. 在MapTask端发生两次排序,在排序时,用户唯一可以控制的是提供一个key的比较器

  3. 设置key的比较器

    ①用户可以自定义key的比较器,自定义的比较器必须是一个RawComparator类型的类

      重点是实现compareTo()方法

    ②用户通过key,让key实现WritableComparable接口,系统自动提供一个比较器

    重点是实现compareTo()方法

  4. 排序的分类

    全排序: 对所有的数据进行排序,指生成一个结果文件,这个结果文件整体有序

    部分排序: 最终生成N个结果文件,每个文件内部整体有序

    二次排序: 在对key进行比较时,比较的条件为多个

    辅助排序: 在进入reduce阶段时,通过比较key是否相同,将相同的key分为1组

三、分组

  1. 分组通过分组比较器,对进入reduce的key进行对比,key相同的分为一组,一次性进入Reducer,被调用reduce方法

  2. 分组比较器的设置

    ①用户可以自定义key的分组比较器,自定义的比较器必须是一个RawComparator类型的类

      重点是实现compareTo()方法

    ②如果没有设置key的分组比较器,默认采取在Map阶段排序时,key的比较器

  3. Reduce的细节

    在进入reduce(),Reducer会自动实例化一个key,value,这个key-value在Redcuer工作期间,一直是一个不变的对象

    每次迭代,reducer会把读到的新的key-value的属性值赋值给key-value!

四、Combiner

  1. Combiner的本质是一个Reducer,对key-value进行合并,Combiner的作用就是对map端的输出先做一次合并,以减少map和reduce结点之间的数据传输量,以提高网络IO性能。

  2. Combiner 和 Reducer的区别

    Combiner在shuffle阶段运行

    Reducer在reduce阶段运行

  3. Combiner适用于 +,-操作,不适合 *,/操作

  4. Combiner的运行时机

    在MapTask端: ①每次从缓冲区将数据溢写到磁盘之前,如果设置了Combiner,数据会被Combine之后再溢写到磁盘

    ②在MapTask最后的merge阶段,如果溢写的片段数据>=3,,如果设置了Combiner,在生成最终的数据时,也会先执行Combine之后再溢写到磁盘

    在ReduceTask端: ③shuffle线程从多个MapTask读取同一个分区的数据,之后进行合并,在合并时,如果shuffle所使用的内存不够,也会将部分数据临时溢写到磁盘,此时如果设置了Combiner,数据会被Combine之后再溢写到磁盘

  5. Combiner的本质目的是为了减少MR在运行期间的磁盘IO和网络IO

相关文章:

  • 2021-04-06
  • 2022-12-23
  • 2021-10-01
  • 2021-07-09
  • 2021-12-19
  • 2021-07-24
  • 2021-06-14
猜你喜欢
  • 2022-01-20
  • 2022-01-06
相关资源
相似解决方案