1 概述
该瞅瞅MapReduce的内部运行原理了,以前只知道个皮毛,再不搞搞,不然怎么死的都不晓得。下文会以2.4版本中的WordCount这个经典例子作为分析的切入点,一步步来看里面到底是个什么情况。
2 为什么要使用MapReduce
Map/Reduce,是一种模式,适合解决并行计算的问题,比如TopN、贝叶斯分类等。注意,是并行计算,而非迭代计算,像涉及到层次聚类的问题就不太适合了。
从名字可以看出,这种模式有两个步骤,Map和Reduce。Map即数据的映射,用于把一组键值对映射成另一组新的键值对,而Reduce这个东东,以Map阶段的输出结果作为输入,对数据做化简、合并等操作。
而MapReduce是Hadoop生态系统中基于底层HDFS的一个计算框架,它的上层又可以是Hive、Pig等数据仓库框架,也可以是Mahout这样的数据挖掘工具。由于MapReduce依赖于HDFS,其运算过程中的数据等会保存到HDFS上,把对数据集的计算分发给各个节点,并将结果进行汇总,再加上各种状态汇报、心跳汇报等,其只适合做离线计算。和实时计算框架Storm、Spark等相比,速度上没有优势。旧的Hadoop生态几乎是以MapReduce为核心的,但是慢慢的发展,其扩展性差、资源利用率低、可靠性等问题都越来越让人觉得不爽,于是才产生了Yarn这个新的东东,并且二代版的Hadoop生态都是以Yarn为核心。Storm、Spark等都可以基于Yarn使用。
3 怎么运行MapReduce
明白了哪些地方可以使用这个牛叉的MapReduce框架,那该怎么用呢?Hadoop的MapReduce源码给我们提供了范例,在其hadoop-mapreduce-examples子工程中包含了MapReduce的Java版例子。在写完类似的代码后,打包成jar,在HDFS的客户端运行:
bin/hadoop jar mapreduce_examples.jar mainClass args
即可。当然,也可以在IDE(如Eclipse)中,进行远程运行、调试程序。
至于,HadoopStreaming方式,网上有很多。我们这里只讨论Java的实现。
4 如何编写MapReduce程序
如前文所说,MapReduce中有Map和Reduce,在实现MapReduce的过程中,主要分为这两个阶段,分别以两类函数进行展现,一个是map函数,一个是reduce函数。map函数的参数是一个<key,value>键值对,其输出结果也是键值对,reduce函数以map的输出作为输入进行处理。
4.1 代码构成
实际的代码中,需要三个元素,分别是Map、Reduce、运行任务的代码。这里的Map类是继承了org.apache.hadoop.mapreduce.Mapper,并实现其中的map方法;而Reduce类是继承了org.apache.hadoop.mapreduce.Reducer,实现其中的reduce方法。至于运行任务的代码,就是我们程序的入口。
下面是Hadoop提供的WordCount源码。
1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.examples; 19 20 import java.io.IOException; 21 import java.util.StringTokenizer; 22 23 import org.apache.hadoop.conf.Configuration; 24 import org.apache.hadoop.fs.Path; 25 import org.apache.hadoop.io.IntWritable; 26 import org.apache.hadoop.io.Text; 27 import org.apache.hadoop.mapreduce.Job; 28 import org.apache.hadoop.mapreduce.Mapper; 29 import org.apache.hadoop.mapreduce.Reducer; 30 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 31 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 32 import org.apache.hadoop.util.GenericOptionsParser; 33 34 public class WordCount { 35 36 public static class TokenizerMapper 37 extends Mapper<Object, Text, Text, IntWritable>{ 38 39 private final static IntWritable one = new IntWritable(1); 40 private Text word = new Text(); 41 42 public void map(Object key, Text value, Context context 43 ) throws IOException, InterruptedException { 44 StringTokenizer itr = new StringTokenizer(value.toString()); 45 while (itr.hasMoreTokens()) { 46 word.set(itr.nextToken()); 47 context.write(word, one); 48 } 49 } 50 } 51 52 public static class IntSumReducer 53 extends Reducer<Text,IntWritable,Text,IntWritable> { 54 private IntWritable result = new IntWritable(); 55 56 public void reduce(Text key, Iterable<IntWritable> values, 57 Context context 58 ) throws IOException, InterruptedException { 59 int sum = 0; 60 for (IntWritable val : values) { 61 sum += val.get(); 62 } 63 result.set(sum); 64 context.write(key, result); 65 } 66 } 67 68 public static void main(String[] args) throws Exception { 69 Configuration conf = new Configuration(); 70 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 71 if (otherArgs.length != 2) { 72 System.err.println("Usage: wordcount <in> <out>"); 73 System.exit(2); 74 } 75 Job job = new Job(conf, "word count"); 76 job.setJarByClass(WordCount.class); 77 job.setMapperClass(TokenizerMapper.class); 78 job.setCombinerClass(IntSumReducer.class); 79 job.setReducerClass(IntSumReducer.class); 80 job.setOutputKeyClass(Text.class); 81 job.setOutputValueClass(IntWritable.class); 82 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 83 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 84 System.exit(job.waitForCompletion(true) ? 0 : 1); 85 } 86 }