MapReduce是什么
首先让我们来重温一下 hadoop 的四大组件:
HDFS:分布式存储系统
MapReduce:分布式计算系统
YARN:hadoop 的资源调度系统
Common:以上三大组件的底层支撑组件,主要提供基础工具包和 RPC 框架等
MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于 Hadoop 的数据分析应用” 的核心框架
MapReduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布 式运算程序,并发运行在一个 Hadoop 集群上
为什么需要 MapReduce
1、海量数据在单机上处理因为硬件资源限制,无法胜任
2、而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度
3、引入 MapReduce 框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将 分布式计算中的复杂性交由框架来处理
设想一个海量数据场景下的数据计算需求:
| 单机版:磁盘受限,内存受限,计算能力受限 |
|
分布式版: 1、 数据存储的问题,hadoop 提供了 hdfs 解决了数据存储这个问题 2、 运算逻辑至少要分为两个阶段,先并发计算(map),然后汇总(reduce)结果 3、 这两个阶段的计算如何启动?如何协调? 4、 运算程序到底怎么执行?数据找程序还是程序找数据? 5、 如何分配两个阶段的多个运算任务? 6、 如何管理任务的执行过程中间状态,如何容错? 7、 如何监控? 8、 出错如何处理?抛异常?重试? |
可见在程序由单机版扩成分布式版时,会引入大量的复杂工作。为了提高开发效率,可以将 分布式程序中的公共功能封装成框架,让开发人员可以将精力集中于业务逻辑。
Hadoop 当中的 MapReduce 就是这样的一个分布式程序运算框架,它把大量分布式程序都会 涉及的到的内容都封装进了,让用户只用专注自己的业务逻辑代码的开发。它对应以上问题 的整体结构如下:
MRAppMaster:MapReduce Application Master,分配任务,协调任务的运行
MapTask:阶段并发任,负责 mapper 阶段的任务处理 YARNChild
ReduceTask:阶段汇总任务,负责 reducer 阶段的任务处理 YARNChild
MapReduce做什么
简单地讲,MapReduce可以做大数据处理。所谓大数据处理,即以价值为导向,对大数据加工、挖掘和优化等各种处理。
MapReduce擅长处理大数据,它为什么具有这种能力呢?这可由MapReduce的设计思想发觉。MapReduce的思想就是“分而治之”。
(1)Mapper负责“分”,即把复杂的任务分解为若干个“简单的任务”来处理。“简单的任务”包含三层含义:一是数据或计算的规模相对原任务要大大缩小;二是就近计算原则,即任务会分配到存放着所需数据的节点上进行计算;三是这些小任务可以并行计算,彼此间几乎没有依赖关系。
(2)Reducer负责对map阶段的结果进行汇总。至于需要多少个Reducer,用户可以根据具体问题,通过在mapred-site.xml配置文件里设置参数mapred.reduce.tasks的值,缺省值为1。
MapReduce 程序运行演示
在 MapReduce 组件里,官方给我们提供了一些样例程序,其中非常有名的就是 wordcount 和 pi 程序。这些 MapReduce 程序的代码都在 hadoop-mapreduce-examples-2.7.5.jar 包里,这 个 jar 包在 hadoop 安装目录下的/share/hadoop/mapreduce/目录里 下面我们使用 hadoop 命令来试跑例子程序,看看运行效果
MapReduce 示例 pi 的程序
[hadoop@hadoop1 ~]$ cd apps/hadoop-2.7.5/share/hadoop/mapreduce/ [hadoop@hadoop1 mapreduce]$ pwd /home/hadoop/apps/hadoop-2.7.5/share/hadoop/mapreduce [hadoop@hadoop1 mapreduce]$ hadoop jar hadoop-mapreduce-examples-2.7.5.jar pi 5 5
MapReduce 示例 wordcount 的程序
[hadoop@hadoop1 mapreduce]$ hadoop jar hadoop-mapreduce-examples-2.7.5.jar wordcount /wc/input1/ /wc/output1/
查看结果
[hadoop@hadoop1 mapreduce]$ hadoop fs -cat /wc/output1/part-r-00000
其他程序
那除了这两个程序以外,还有没有官方提供的其他程序呢,还有就是它们的源码在哪里呢?
我们打开 mapreduce 的源码工程,里面有一个 hadoop-mapreduce-project 项目:
里面有一个例子程序的子项目:hadoop-mapreduce-examples
其中 src 是例子程序源码目录,pom.xml 是该项目的 maven 管理配置文件,我们打开该文件, 找到第 127 行,它告诉了我们例子程序的主程序入口:
找到src\main\java\org\apache\hadoop\examples目录
打开主入口程序,看源代码:
找到这一步,我们就能知道其实 wordcount 程序的实际程序就是 WordCount.class,这就是我 们想要找的例子程序的源码。
WordCount.java源码
1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.examples; 19 20 import java.io.IOException; 21 import java.util.StringTokenizer; 22 23 import org.apache.hadoop.conf.Configuration; 24 import org.apache.hadoop.fs.Path; 25 import org.apache.hadoop.io.IntWritable; 26 import org.apache.hadoop.io.Text; 27 import org.apache.hadoop.mapreduce.Job; 28 import org.apache.hadoop.mapreduce.Mapper; 29 import org.apache.hadoop.mapreduce.Reducer; 30 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 31 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 32 import org.apache.hadoop.util.GenericOptionsParser; 33 34 public class WordCount { 35 36 public static class TokenizerMapper 37 extends Mapper<Object, Text, Text, IntWritable>{ 38 39 private final static IntWritable one = new IntWritable(1); 40 private Text word = new Text(); 41 42 public void map(Object key, Text value, Context context 43 ) throws IOException, InterruptedException { 44 StringTokenizer itr = new StringTokenizer(value.toString()); 45 while (itr.hasMoreTokens()) { 46 word.set(itr.nextToken()); 47 context.write(word, one); 48 } 49 } 50 } 51 52 public static class IntSumReducer 53 extends Reducer<Text,IntWritable,Text,IntWritable> { 54 private IntWritable result = new IntWritable(); 55 56 public void reduce(Text key, Iterable<IntWritable> values, 57 Context context 58 ) throws IOException, InterruptedException { 59 int sum = 0; 60 for (IntWritable val : values) { 61 sum += val.get(); 62 } 63 result.set(sum); 64 context.write(key, result); 65 } 66 } 67 68 public static void main(String[] args) throws Exception { 69 Configuration conf = new Configuration(); 70 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 71 if (otherArgs.length < 2) { 72 System.err.println("Usage: wordcount <in> [<in>...] <out>"); 73 System.exit(2); 74 } 75 Job job = Job.getInstance(conf, "word count"); 76 job.setJarByClass(WordCount.class); 77 job.setMapperClass(TokenizerMapper.class); 78 job.setCombinerClass(IntSumReducer.class); 79 job.setReducerClass(IntSumReducer.class); 80 job.setOutputKeyClass(Text.class); 81 job.setOutputValueClass(IntWritable.class); 82 for (int i = 0; i < otherArgs.length - 1; ++i) { 83 FileInputFormat.addInputPath(job, new Path(otherArgs[i])); 84 } 85 FileOutputFormat.setOutputPath(job, 86 new Path(otherArgs[otherArgs.length - 1])); 87 System.exit(job.waitForCompletion(true) ? 0 : 1); 88 } 89 }