1.1MapReduce概述

  MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题。MR由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,其执行流程如图1。这两个函数的形参是key、value对,表示函数的输入信息。

Hadoop日记Day12---MapReduce学习图 1

1.1.1 map任务处理

<1> 读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。
<2> 写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
<3> 对输出的key、value进行分区
<4> 对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。
<5> (可选)分组后的数据进行归约。

1.1.2 Reduce任务处理

<1> 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点
<2> 对多个map任务的输出进行合并排序。写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
<3> 把reduce的输出保存到文件中。

1.2 Map和Reduce编程模型

  在Hadoop 中, map 函数位于内置类org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN, KEYOUT, VALUEOUT>中,reduce 函数位于内置类org.apache.hadoop. mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>中。我们要做的就是覆盖map 函数和reduce 函数。对于Hadoop 的map 函数和reduce 函数,处理的数据是键值对,也就是说map 函数接收的数据是键值对,两个参数;输出的也是键值对,两个参数;reduce 函数接收的参数和输出的结果也是键值对。

1.2.1 Mapper类 

  现在看一下Mapper 类,有四个泛型,分别是KEYIN、VALUEIN、KEYOUT、VALUEOUT,前面两个KEYIN、VALUEIN 指的是map 函数输入的参数key、value 的类型;后面两个KEYOUT、VALUEOUT 指的是map 函数输出的key、value 的类型。map 函数定义如下代码1.1。

1 protected void map(KEYIN key, VALUEIN value, 
2                      Context context) throws IOException, InterruptedException {
3     context.write((KEYOUT) key, (VALUEOUT) value);
4   }

代码 1.1

  在上面的代码中,输入参数key、value 的类型就是KEYIN、VALUEIN,每一个键值对都会调用一次map 函数。在这里,map 函数没有处理输入的key、value,直接通过context.write(…)方法输出了,输出的key、value 的类型就是KEYOUT、VALUEOUT。这是默认实现,通常是需要我们根据业务逻辑覆盖的。

1.2.2 Reducer类

  再看一下Reducer 类,也有四个泛型,同理,分别指的是reduce 函数输入的key、value类型,和输出的key、value 类型。看一下reduce 函数定义,如下代码1.2。

1 protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
2                         ) throws IOException, InterruptedException {
3     for(VALUEIN value: values) {
4       context.write((KEYOUT) key, (VALUEOUT) value);
5     }
6   }

代码 1.2

  在上面代码中,reduce 函数的形参key、value 的类型是KEYIN、VALUEIN。要注意这里的value 是存在于java.lang.Iterable<VALUEIN>中的,这是一个迭代器,用于集合遍历的,意味着values 是一个集合。reduce 函数默认实现是把每个value 和对应的key,通过调用context.write(…)输出了,这里输出的类型是KEYOUT、VALUEOUT。通常我们会根据业务逻辑覆盖reduce 函数的实现。

二、 MapReduce 执行原理

2.1 MapRduce执行流程  

  MapReduce 运行的时候,会通过Mapper 运行的任务读取HDFS 中的数据文件,然后调用自己的方法,处理数据,最后输出。Reducer 任务会接收Mapper 任务输出的数据,作为自己的输入数据,调用自己的方法,最后输出到HDFS 的文件中。整个流程如图3.1

Hadoop日记Day12---MapReduce学习
图2.1

2.2 Mapper 任务的执行过程

  每个Mapper 任务是一个java 进程,它会读取HDFS 中的文件,解析成很多的键值对,经过我们覆盖的map 方法处理后,转换为很多的键值对再输出。整个Mapper 任务的处理过程又可以分为以下几个阶段,如图3.2

Hadoop日记Day12---MapReduce学习

图2.2

  在图3.2中,把Mapper 任务的运行过程分为六个阶段  


  第一阶段是把输入文件按照一定的标准分片(InputSplit),每个输入片的大小是固定的。默认情况下,输入片(InputSplit)的大小与数据块(Block)的大小是相同的。如果数据块(Block)的大小是默认值64MB,输入文件有两个,一个是32MB,一个是72MB。那么小的文件是一个输入片,大文件会分为两个数据块,那么是两个输入片。一共产生三个输入片。每一个输入片由一个Mapper 进程处理。这里的三个输入片,会有三个Mapper 进程处理。

  第二阶段是对输入片中的记录按照一定的规则解析成键值对。有个默认规则是把每一行文本内容解析成键值对“键”是每一行的起始位置(单位是字节),“值”本行的文本内容

  第三阶段是调用Mapper 类中的map 方法。第二阶段中解析出来的每一个键值对,调用一次map 方法。如果有1000 个键值对,就会调用1000 次map 方法。每一次调用map 方法会输出零个或者多个键值对。map具体的工作做有我们自己来决定,我们要对map函数进行覆盖,封装我们要进行的操作来实现我们最终的目的。

  第四阶段是按照一定的规则对第三阶段的每个Mapper任务输出的键值对进行分区。比较是基于键进行的。比如我们的键表示省份(如北京、上海、山东等),那么就可以按照不同省份进行分区,同一个省份的键值对划分到一个区中。默认是只有一个区。分区的数量就是Reducer 任务运行的数量。默认只有一个Reducer 任务。

  第五阶段是对每个分区中的键值对进行排序。首先,按照键进行排序,对于键相同的键值对,按照值进行排序。比如三个键值对<2,2>、<1,3>、<2,1>,键和值分别是整数。那么排序后的结果是<1,3>、<2,1>、<2,2>。如果有第六阶段,那么进入第六阶段;如果没有,直接输出到本地的linux 文件中。

  第六阶段是对数据进行归约处理,也就是reduce 处理。对于键相等的键值对才会调用一次reduce 方法。经过这一阶段,数据量会减少。归约后的数据输出到本地的linxu 文件中。本阶段默认是没有的,需要用户自己增加这一阶段的代码

2.3 Reducer执行过程

  每个Reducer 任务是一个java 进程。Reducer 任务接收Mapper 任务的输出,归约处理后写入到HDFS 中,可以分为如图2.3 所示的几个阶段

Hadoop日记Day12---MapReduce学习

图2.3

  在图3.2中,把Mapper 任务的运行过程分为个阶段。

  第一阶段是Reducer 任务会主动从Mapper 任务复制其输出的键值对。Mapper 任务可能会有很多,因此Reducer 会复制多个Mapper 的输出。

  第二阶段是把复制到Reducer 本地数据,全部进行合并,即把分散的数据合并成一个大的数据。再对合并后的数据排序。

  第三阶段是对排序后的键值对调用reduce 方法。键相等的键值对调用一次reduce 方法,每次调用会产生零个或者多个键值对。最后把这些输出的键值对写入到HDFS 文件中。

  在整个MapReduce 程序的执行过程中如图2.4,我可以根据上面的讲解来分析下面MapReducer执行过程,从下图可知每个Mapper任务分了两个区,因此会有两个Reducer任务,最终产生两个HDFS副本。

Hadoop日记Day12---MapReduce学习

图 2.4

2.4  键值对的编号

  在对Mapper 任务、Reducer 任务的分析过程中,会看到很多阶段都出现了键值对,为避免混淆,所以我在这里对键值对进行编号,方便大家理解键值对的变化情况。如图2.5

Hadoop日记Day12---MapReduce学习

图 2.5

  在图2.5 中,对于Mapper 任务输入的键值对,定义为key1 和value1。在map 方法中处理后,输出的键值对,定义为key2 和value2。reduce 方法接收key2 和value2,处理后,输出key3 和value3。在下文讨论键值对时,可能把key1 和value1 简写为<k1,v1>,key2 和value2 简写为<k2,v2>,key3 和value3 简写为<k3,v3>。

2.5 举例:单词计数

  该业务要求统计指定文件中的所有单词的出现次数。下面看一下源文件的内容为:

  “hello you”

  “hello me”

  内容很简单,两行文本,每行的单词中间使用空格区分。

  分析思路:最直观的想法是使用数据结构Map。解析文件中出现的每个单词,用单词作为key,出现次数作为value。这个思路没有问题,但是在大数据环境下就不行了。我们需要使用MapReduce 来做。根据Mapper 任务和Reducer任务的运行阶段,我们知道在Mapper任务的第二阶段是把文件的每一行转化成键值对,那么第三阶段的map 方法就能取得每一行文本内容,我们可以在map 方法统计本行文本中单词出现的次数,把每个单词的出现次数作为新的键值对输出。在Reducer 任务的第二阶段会对Mapper 任务输出的键值对按照键进行排序,键相等的键值对会调用一次reduce 方法。在这里,“键”就是单词,“值”就是出现次数。因此可以在reduce 方法中对单词的不同行中的所有出现次数相加,结果就是该单词的总的出现次数。最后把这个结果输出。

  程序源码如下代码 2.1。  

 1 package counter;
 2 
 3 import java.net.URI;
 4 
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.fs.FileSystem;
 7 import org.apache.hadoop.fs.Path;
 8 import org.apache.hadoop.io.LongWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Counter;
11 import org.apache.hadoop.mapreduce.Job;
12 import org.apache.hadoop.mapreduce.Mapper;
13 import org.apache.hadoop.mapreduce.Reducer;
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
19 
20 public class WordCountApp {
21     static final String INPUT_PATH = "hdfs://hadoop:9000/input";
22     static final String OUT_PATH = "hdfs://hadoop:9000/output";
23     
24     public static void main(String[] args) throws Exception {
25         Configuration conf = new Configuration();
26         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
27         final Path outPath = new Path(OUT_PATH);
28         if(fileSystem.exists(outPath)){
29             fileSystem.delete(outPath, true);
30         }
31         final Job job = new Job(conf , WordCountApp.class.getSimpleName());
32         FileInputFormat.setInputPaths(job, INPUT_PATH);//1.1指定读取的文件位于哪里
33         
34         job.setInputFormatClass(TextInputFormat.class);//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
35         job.setMapperClass(MyMapper.class);//1.2 指定自定义的map类
36         job.setMapOutputKeyClass(Text.class);//map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略
37         job.setMapOutputValueClass(LongWritable.class);
38         
39         job.setPartitionerClass(HashPartitioner.class);//1.3 分区    
40         job.setNumReduceTasks(1);//有一个reduce任务运行
41         //1.4 TODO 排序、分组
42         //1.5 TODO 规约
43         job.setReducerClass(MyReducer.class);//2.2 指定自定义reduce类
44         job.setOutputKeyClass(Text.class);//指定reduce的输出类型
45         job.setOutputValueClass(LongWritable.class);//2.3 指定写出到哪里
46         FileOutputFormat.setOutputPath(job, outPath);//指定输出文件的格式化类
47         
48         job.setOutputFormatClass(TextOutputFormat.class);
49         
50         job.waitForCompletion(true);//把job提交给JobTracker运行
51     }
52     
53     /**
54      * KEYIN    即k1        表示行的偏移量
55      * VALUEIN    即v1        表示行文本内容
56      * KEYOUT    即k2        表示行中出现的单词
57      * VALUEOUT    即v2        表示行中出现的单词的次数,固定值1
58      */
59     static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
60         protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
61 
62             final String line = v1.toString();
63             final String[] splited = line.split(" ");
64             for (String word : splited) {
65                 context.write(new Text(word), new LongWritable(1));
66             }
67         };
68     }
69     
70     /**
71      * KEYIN    即k2        表示行中出现的单词
72      * VALUEIN    即v2        表示行中出现的单词的次数
73      * KEYOUT    即k3        表示文本中出现的不同单词
74      * VALUEOUT    即v3        表示文本中出现的不同单词的总次数
75      *
76      */
77     static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
78         protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
79             long times = 0L;
80             for (LongWritable count : v2s) {
81                 times += count.get();
82             }
83             ctx.write(k2, new LongWritable(times));
84         };
85     }
86         
87 }
View Code

相关文章:

  • 2021-08-29
  • 2021-07-21
  • 2022-12-23
  • 2022-12-23
  • 2021-04-26
  • 2021-06-03
  • 2021-11-26
  • 2022-01-09
猜你喜欢
  • 2021-08-04
  • 2021-12-30
  • 2021-08-17
  • 2021-03-31
  • 2022-12-23
  • 2021-11-25
  • 2021-12-06
相关资源
相似解决方案