1.1MapReduce概述
MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题。MR由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,其执行流程如图1。这两个函数的形参是key、value对,表示函数的输入信息。
图 1
1.1.1 map任务处理
<1> 读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。
<2> 写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
<3> 对输出的key、value进行分区。
<4> 对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。
<5> (可选)分组后的数据进行归约。
1.1.2 Reduce任务处理
<1> 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。
<2> 对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
<3> 把reduce的输出保存到文件中。
1.2 Map和Reduce编程模型
在Hadoop 中, map 函数位于内置类org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN, KEYOUT, VALUEOUT>中,reduce 函数位于内置类org.apache.hadoop. mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>中。我们要做的就是覆盖map 函数和reduce 函数。对于Hadoop 的map 函数和reduce 函数,处理的数据是键值对,也就是说map 函数接收的数据是键值对,两个参数;输出的也是键值对,两个参数;reduce 函数接收的参数和输出的结果也是键值对。
1.2.1 Mapper类
现在看一下Mapper 类,有四个泛型,分别是KEYIN、VALUEIN、KEYOUT、VALUEOUT,前面两个KEYIN、VALUEIN 指的是map 函数输入的参数key、value 的类型;后面两个KEYOUT、VALUEOUT 指的是map 函数输出的key、value 的类型。map 函数定义如下代码1.1。
1 protected void map(KEYIN key, VALUEIN value, 2 Context context) throws IOException, InterruptedException { 3 context.write((KEYOUT) key, (VALUEOUT) value); 4 }
代码 1.1
在上面的代码中,输入参数key、value 的类型就是KEYIN、VALUEIN,每一个键值对都会调用一次map 函数。在这里,map 函数没有处理输入的key、value,直接通过context.write(…)方法输出了,输出的key、value 的类型就是KEYOUT、VALUEOUT。这是默认实现,通常是需要我们根据业务逻辑覆盖的。
1.2.2 Reducer类
再看一下Reducer 类,也有四个泛型,同理,分别指的是reduce 函数输入的key、value类型,和输出的key、value 类型。看一下reduce 函数定义,如下代码1.2。
1 protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context 2 ) throws IOException, InterruptedException { 3 for(VALUEIN value: values) { 4 context.write((KEYOUT) key, (VALUEOUT) value); 5 } 6 }
代码 1.2
在上面代码中,reduce 函数的形参key、value 的类型是KEYIN、VALUEIN。要注意这里的value 是存在于java.lang.Iterable<VALUEIN>中的,这是一个迭代器,用于集合遍历的,意味着values 是一个集合。reduce 函数默认实现是把每个value 和对应的key,通过调用context.write(…)输出了,这里输出的类型是KEYOUT、VALUEOUT。通常我们会根据业务逻辑覆盖reduce 函数的实现。
二、 MapReduce 执行原理
2.1 MapRduce执行流程
MapReduce 运行的时候,会通过Mapper 运行的任务读取HDFS 中的数据文件,然后调用自己的方法,处理数据,最后输出。Reducer 任务会接收Mapper 任务输出的数据,作为自己的输入数据,调用自己的方法,最后输出到HDFS 的文件中。整个流程如图3.1
图2.1
2.2 Mapper 任务的执行过程
每个Mapper 任务是一个java 进程,它会读取HDFS 中的文件,解析成很多的键值对,经过我们覆盖的map 方法处理后,转换为很多的键值对再输出。整个Mapper 任务的处理过程又可以分为以下几个阶段,如图3.2
图2.2
在图3.2中,把Mapper 任务的运行过程分为六个阶段。
第一阶段是把输入文件按照一定的标准分片(InputSplit),每个输入片的大小是固定的。默认情况下,输入片(InputSplit)的大小与数据块(Block)的大小是相同的。如果数据块(Block)的大小是默认值64MB,输入文件有两个,一个是32MB,一个是72MB。那么小的文件是一个输入片,大文件会分为两个数据块,那么是两个输入片。一共产生三个输入片。每一个输入片由一个Mapper 进程处理。这里的三个输入片,会有三个Mapper 进程处理。
第二阶段是对输入片中的记录按照一定的规则解析成键值对。有个默认规则是把每一行文本内容解析成键值对。“键”是每一行的起始位置(单位是字节),“值”是本行的文本内容。
第三阶段是调用Mapper 类中的map 方法。第二阶段中解析出来的每一个键值对,调用一次map 方法。如果有1000 个键值对,就会调用1000 次map 方法。每一次调用map 方法会输出零个或者多个键值对。map具体的工作做有我们自己来决定,我们要对map函数进行覆盖,封装我们要进行的操作来实现我们最终的目的。
第四阶段是按照一定的规则对第三阶段的每个Mapper任务输出的键值对进行分区。比较是基于键进行的。比如我们的键表示省份(如北京、上海、山东等),那么就可以按照不同省份进行分区,同一个省份的键值对划分到一个区中。默认是只有一个区。分区的数量就是Reducer 任务运行的数量。默认只有一个Reducer 任务。
第五阶段是对每个分区中的键值对进行排序。首先,按照键进行排序,对于键相同的键值对,按照值进行排序。比如三个键值对<2,2>、<1,3>、<2,1>,键和值分别是整数。那么排序后的结果是<1,3>、<2,1>、<2,2>。如果有第六阶段,那么进入第六阶段;如果没有,直接输出到本地的linux 文件中。
第六阶段是对数据进行归约处理,也就是reduce 处理。对于键相等的键值对才会调用一次reduce 方法。经过这一阶段,数据量会减少。归约后的数据输出到本地的linxu 文件中。本阶段默认是没有的,需要用户自己增加这一阶段的代码。
2.3 Reducer执行过程
每个Reducer 任务是一个java 进程。Reducer 任务接收Mapper 任务的输出,归约处理后写入到HDFS 中,可以分为如图2.3 所示的几个阶段
图2.3
在图3.2中,把Mapper 任务的运行过程分为四个阶段。
第一阶段是Reducer 任务会主动从Mapper 任务复制其输出的键值对。Mapper 任务可能会有很多,因此Reducer 会复制多个Mapper 的输出。
第二阶段是把复制到Reducer 本地数据,全部进行合并,即把分散的数据合并成一个大的数据。再对合并后的数据排序。
第三阶段是对排序后的键值对调用reduce 方法。键相等的键值对调用一次reduce 方法,每次调用会产生零个或者多个键值对。最后把这些输出的键值对写入到HDFS 文件中。
在整个MapReduce 程序的执行过程中如图2.4,我可以根据上面的讲解来分析下面MapReducer执行过程,从下图可知每个Mapper任务分了两个区,因此会有两个Reducer任务,最终产生两个HDFS副本。
图 2.4
2.4 键值对的编号
在对Mapper 任务、Reducer 任务的分析过程中,会看到很多阶段都出现了键值对,为避免混淆,所以我在这里对键值对进行编号,方便大家理解键值对的变化情况。如图2.5
图 2.5
在图2.5 中,对于Mapper 任务输入的键值对,定义为key1 和value1。在map 方法中处理后,输出的键值对,定义为key2 和value2。reduce 方法接收key2 和value2,处理后,输出key3 和value3。在下文讨论键值对时,可能把key1 和value1 简写为<k1,v1>,key2 和value2 简写为<k2,v2>,key3 和value3 简写为<k3,v3>。
2.5 举例:单词计数
该业务要求统计指定文件中的所有单词的出现次数。下面看一下源文件的内容为:
“hello you”
“hello me”
内容很简单,两行文本,每行的单词中间使用空格区分。
分析思路:最直观的想法是使用数据结构Map。解析文件中出现的每个单词,用单词作为key,出现次数作为value。这个思路没有问题,但是在大数据环境下就不行了。我们需要使用MapReduce 来做。根据Mapper 任务和Reducer任务的运行阶段,我们知道在Mapper任务的第二阶段是把文件的每一行转化成键值对,那么第三阶段的map 方法就能取得每一行文本内容,我们可以在map 方法统计本行文本中单词出现的次数,把每个单词的出现次数作为新的键值对输出。在Reducer 任务的第二阶段会对Mapper 任务输出的键值对按照键进行排序,键相等的键值对会调用一次reduce 方法。在这里,“键”就是单词,“值”就是出现次数。因此可以在reduce 方法中对单词的不同行中的所有出现次数相加,结果就是该单词的总的出现次数。最后把这个结果输出。
程序源码如下代码 2.1。
1 package counter; 2 3 import java.net.URI; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.FileSystem; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Counter; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.Mapper; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 19 20 public class WordCountApp { 21 static final String INPUT_PATH = "hdfs://hadoop:9000/input"; 22 static final String OUT_PATH = "hdfs://hadoop:9000/output"; 23 24 public static void main(String[] args) throws Exception { 25 Configuration conf = new Configuration(); 26 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); 27 final Path outPath = new Path(OUT_PATH); 28 if(fileSystem.exists(outPath)){ 29 fileSystem.delete(outPath, true); 30 } 31 final Job job = new Job(conf , WordCountApp.class.getSimpleName()); 32 FileInputFormat.setInputPaths(job, INPUT_PATH);//1.1指定读取的文件位于哪里 33 34 job.setInputFormatClass(TextInputFormat.class);//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对 35 job.setMapperClass(MyMapper.class);//1.2 指定自定义的map类 36 job.setMapOutputKeyClass(Text.class);//map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略 37 job.setMapOutputValueClass(LongWritable.class); 38 39 job.setPartitionerClass(HashPartitioner.class);//1.3 分区 40 job.setNumReduceTasks(1);//有一个reduce任务运行 41 //1.4 TODO 排序、分组 42 //1.5 TODO 规约 43 job.setReducerClass(MyReducer.class);//2.2 指定自定义reduce类 44 job.setOutputKeyClass(Text.class);//指定reduce的输出类型 45 job.setOutputValueClass(LongWritable.class);//2.3 指定写出到哪里 46 FileOutputFormat.setOutputPath(job, outPath);//指定输出文件的格式化类 47 48 job.setOutputFormatClass(TextOutputFormat.class); 49 50 job.waitForCompletion(true);//把job提交给JobTracker运行 51 } 52 53 /** 54 * KEYIN 即k1 表示行的偏移量 55 * VALUEIN 即v1 表示行文本内容 56 * KEYOUT 即k2 表示行中出现的单词 57 * VALUEOUT 即v2 表示行中出现的单词的次数,固定值1 58 */ 59 static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ 60 protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException { 61 62 final String line = v1.toString(); 63 final String[] splited = line.split(" "); 64 for (String word : splited) { 65 context.write(new Text(word), new LongWritable(1)); 66 } 67 }; 68 } 69 70 /** 71 * KEYIN 即k2 表示行中出现的单词 72 * VALUEIN 即v2 表示行中出现的单词的次数 73 * KEYOUT 即k3 表示文本中出现的不同单词 74 * VALUEOUT 即v3 表示文本中出现的不同单词的总次数 75 * 76 */ 77 static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ 78 protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException { 79 long times = 0L; 80 for (LongWritable count : v2s) { 81 times += count.get(); 82 } 83 ctx.write(k2, new LongWritable(times)); 84 }; 85 } 86 87 }