MapReduce是一种用于大规模数据集的并行计算编程模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题。其主要思想Map(映射)和Reduce(规约)都是从函数是编程语言中借鉴而来的,它可以使程序员在不懂分布式底层的情况下轻松的将自己的程序运行在分布式系统上,极大地降低了分布式计算的门槛。
1、执行步骤(“天龙八部”)
1) map任务处理
① 读取数据文件内容,对每一行内容解析成<k1,v1>键值对,每个键值对调用一次map函数;
② 编写Map映射函数处理逻辑,将输入的<k1,v1>转换成新的<k2,v2>并输出;
③ 对输出的<k2、v2>按照reducer个数以及分区规则进行分区;
④ 对不同分区的数据,按照k2进行排序、分组,将相同的k2的v2放倒一个集合中,转化成<k2,{v2....}>;
⑤ (可选)将分组后的数据进行归约;
2) reduce任务处理
① 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点;
② 对copy过来的来自多个map任务输出的数据<k2,{v2...}>进行排序、合并,编写Reduce归约函数处理逻辑,将接收到的数据处理转化成<k3,v3>;
③ 将reduce输出的结果存储到HDFS文件中;
2、执行流程原理
执行原理图与Map、Reduce详细执行过程如下所示。
图1.2.1 mapreduce执行原理图
图1.2.2 map与reduce过程示意图
Map端处理流程分析:
1) 每个输入分片会交给一个Map任务(是TaskTracker节点上运行的一个Java进程),默认情况下,系统会以HDFS的一个块大小作为一个分片(hadoop2默认128M,配置dfs.blocksize)。Map任务通过InputFormat将输入分片处理成可供Map处理的<k1,v1>键值对。
2) 通过自己的Map处理方法将<k1,v1>处理成<k2,v2>,输出结果会暂时放在一个环形内存缓冲(缓冲区默认大小100M,由mapreduce.task.io.sort.mb属性控制)中,当缓冲区快要溢出时(默认为缓冲区大小的80%,由mapreduce.map.sort.spill.percent属性控制),会在本地操作系统文件系统中创建一个溢出文件(由mapreduce.cluster.local.dir属性控制,默认${hadoop.tmp.dir}/mapred/local),保存缓冲区的数据。溢写默认控制为内存缓冲区的80%,是为了保证在溢写线程把缓冲区那80%的数据写到磁盘中的同时,Map任务还可以继续将结果输出到缓冲区剩余的20%内存中,从而提高任务执行效率。
3) 每次spill将内存数据溢写到磁盘时,线程会根据Reduce任务的数目以及一定的分区规则将数据进行分区,然后分区内再进行排序、分组,如果设置了Combiner,会执行规约操作。
4) 当map任务结束后,可能会存在多个溢写文件,这时候需要将他们合并,合并操作在每个分区内进行,先排序再分组,如果设置了Combiner并且spill文件大于mapreduce.map.combine.minspills值(默认值3)时,会触发Combine操作。每次分组会形成新的键值对<k2,{v2...}>。
5) 合并操作完成后,会形成map端的输出文件,等待reduce来拷贝。如果设置了压缩,则会将输出文件进行压缩,减少网络流量。是否进行压缩,mapreduce.output.fileoutputformat.compress,默认为false。设置压缩库,mapreduce.output.fileoutputformat.compress.codec,默认值org.apache.hadoop.io.compress.DefaultCodec。
Reduce端处理流程分析:
1) Reduce端会从AM那里获取已经执行完的map任务,然后以http的方法将map输出的对应数据拷贝至本地(拷贝最大线程数mapreduce.reduce.shuffle.parallelcopies,默认值5)。每次拷贝过来的数据都存于内存缓冲区中,当数据量大于缓冲区大小(由mapreduce.reduce.shuffle.input.buffer.percent控制,默认0.7)的一定比例(由mapreduce.reduce.shuffle.merge.percent控制,默认0.66)时,则将缓冲区的数据溢写到一个本地磁盘中。由于数据来自多个map的同一个分区,溢写时不需要再分区,但要进行排序和分组,如果设置了Combiner,还会执行Combine操作。溢写过程与map端溢写类似,输出写入可同时进行。
2) 当所有的map端输出该分区数据都已经拷贝完毕时,本地磁盘可能存在多个spill文件,需要将他们再次排序、分组合并,最后形成一个最终文件,作为Reduce任务的输入。此时标志Shuffle阶段结束,然后Reduce任务启动,将最终文件中的数据处理形成新的键值对<k3,v3>。
3) 将生成的数据<k3,v3>输出到HDFS文件中。
1、WordCount简介及hadoop1.1.2版本的写法
WordCount程序被称作MapReduce的入门“Hello World”,要学好MapReduce必须先搞定WordCount。WordCount处理的目的是统计所有文档中不同单词出现的次数。样例代码如下所示(完整项目源码点此下载,使用hadoop版本1.1.2):
package mapreduce; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; public class MyWordCount { static final String INPUT_PATH = "hdfs://hadoop:9000/hello"; static final String OUTPUT_PATH = "hdfs://hadoop:9000/out"; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, WordCountApp2.class.getSimpleName()); //1.1 输入目录 FileInputFormat.setInputPaths(job, INPUT_PATH); //指定对输入数据进行格式化处理的类 job.setInputFormatClass(TextInputFormat.class); //1.2 指定自定义的Mapper类 job.setMapperClass(MyMapper.class); //指定map输出的<k,v>类型。如果<k3,v3>类型与<k2,v2>类型一致,此处可以省略 //job.setMapOutputKeyClass(Text.class); //job.setMapOutputValueClass(LongWritable.class); //1.3 分区 //job.setPartitionerClass(HashPartitioner.class); //job.setNumReduceTasks(3); //1.4 排序、分组 //1.5(可选)归约 //job.setCombinerClass(MyReducer.class); //2.2指定自定义的Recude函数 job.setReducerClass(MyReducer.class); //指定输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //2.3指定输出路径 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); //指定输出格式化类 //job.setOutputFormatClass(TextOutputFormat.class); //把作业提交给JobTracker运行 job.waitForCompletion(true); } /** * Map方法 * <0,hello you> => <hello,1>,<you,1> * <10,hello me> => <hello,1>,<me,1> */ static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{ protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,LongWritable>.Context context) throws java.io.IOException ,InterruptedException { StringTokenizer st = new StringTokenizer(value.toString()); while(st.hasMoreTokens()){ Text word = new Text(st.nextToken()); context.write(word, new LongWritable(1L)); } }; } /** * Reduce方法 * <hello,{1,1}>,<me,{1}>,<you,{1}> => <hello,2>,<me,1>,<you,1> */ static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{ protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, org.apache.hadoop.mapreduce.Reducer<Text,LongWritable,Text,LongWritable>.Context context) throws java.io.IOException ,InterruptedException { long sum = 0L; for(LongWritable v2 : v2s){ sum += v2.get(); } context.write(k2, new LongWritable(sum)); }; } }