1 概述

该瞅瞅MapReduce的内部运行原理了,以前只知道个皮毛,再不搞搞,不然怎么死的都不晓得。下文会以2.4版本中的WordCount这个经典例子作为分析的切入点,一步步来看里面到底是个什么情况。

2 为什么要使用MapReduce

Map/Reduce,是一种模式,适合解决并行计算的问题,比如TopN、贝叶斯分类等。注意,是并行计算,而非迭代计算,像涉及到层次聚类的问题就不太适合了。

从名字可以看出,这种模式有两个步骤,Map和Reduce。Map即数据的映射,用于把一组键值对映射成另一组新的键值对,而Reduce这个东东,以Map阶段的输出结果作为输入,对数据做化简、合并等操作。

而MapReduce是Hadoop生态系统中基于底层HDFS的一个计算框架,它的上层又可以是Hive、Pig等数据仓库框架,也可以是Mahout这样的数据挖掘工具。由于MapReduce依赖于HDFS,其运算过程中的数据等会保存到HDFS上,把对数据集的计算分发给各个节点,并将结果进行汇总,再加上各种状态汇报、心跳汇报等,其只适合做离线计算。和实时计算框架Storm、Spark等相比,速度上没有优势。旧的Hadoop生态几乎是以MapReduce为核心的,但是慢慢的发展,其扩展性差、资源利用率低、可靠性等问题都越来越让人觉得不爽,于是才产生了Yarn这个新的东东,并且二代版的Hadoop生态都是以Yarn为核心。Storm、Spark等都可以基于Yarn使用。

3 怎么运行MapReduce

明白了哪些地方可以使用这个牛叉的MapReduce框架,那该怎么用呢?Hadoop的MapReduce源码给我们提供了范例,在其hadoop-mapreduce-examples子工程中包含了MapReduce的Java版例子。在写完类似的代码后,打包成jar,在HDFS的客户端运行:

bin/hadoop jar mapreduce_examples.jar mainClass args

即可。当然,也可以在IDE(如Eclipse)中,进行远程运行、调试程序。

至于,HadoopStreaming方式,网上有很多。我们这里只讨论Java的实现。

4 如何编写MapReduce程序

    如前文所说,MapReduce中有Map和Reduce,在实现MapReduce的过程中,主要分为这两个阶段,分别以两类函数进行展现,一个是map函数,一个是reduce函数。map函数的参数是一个<key,value>键值对,其输出结果也是键值对,reduce函数以map的输出作为输入进行处理。

4.1 代码构成

    实际的代码中,需要三个元素,分别是Map、Reduce、运行任务的代码。这里的Map类是继承了org.apache.hadoop.mapreduce.Mapper,并实现其中的map方法;而Reduce类是继承了org.apache.hadoop.mapreduce.Reducer,实现其中的reduce方法。至于运行任务的代码,就是我们程序的入口。

    下面是Hadoop提供的WordCount源码。

 1 /**
 2  * Licensed to the Apache Software Foundation (ASF) under one
 3  * or more contributor license agreements.  See the NOTICE file
 4  * distributed with this work for additional information
 5  * regarding copyright ownership.  The ASF licenses this file
 6  * to you under the Apache License, Version 2.0 (the
 7  * "License"); you may not use this file except in compliance
 8  * with the License.  You may obtain a copy of the License at
 9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.examples;
19 
20 import java.io.IOException;
21 import java.util.StringTokenizer;
22 
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.fs.Path;
25 import org.apache.hadoop.io.IntWritable;
26 import org.apache.hadoop.io.Text;
27 import org.apache.hadoop.mapreduce.Job;
28 import org.apache.hadoop.mapreduce.Mapper;
29 import org.apache.hadoop.mapreduce.Reducer;
30 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
31 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
32 import org.apache.hadoop.util.GenericOptionsParser;
33 
34 public class WordCount {
35 
36   public static class TokenizerMapper 
37        extends Mapper<Object, Text, Text, IntWritable>{
38     
39     private final static IntWritable one = new IntWritable(1);
40     private Text word = new Text();
41       
42     public void map(Object key, Text value, Context context
43                     ) throws IOException, InterruptedException {
44       StringTokenizer itr = new StringTokenizer(value.toString());
45       while (itr.hasMoreTokens()) {
46         word.set(itr.nextToken());
47         context.write(word, one);
48       }
49     }
50   }
51   
52   public static class IntSumReducer 
53        extends Reducer<Text,IntWritable,Text,IntWritable> {
54     private IntWritable result = new IntWritable();
55 
56     public void reduce(Text key, Iterable<IntWritable> values, 
57                        Context context
58                        ) throws IOException, InterruptedException {
59       int sum = 0;
60       for (IntWritable val : values) {
61         sum += val.get();
62       }
63       result.set(sum);
64       context.write(key, result);
65     }
66   }
67 
68   public static void main(String[] args) throws Exception {
69     Configuration conf = new Configuration();
70     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
71     if (otherArgs.length != 2) {
72       System.err.println("Usage: wordcount <in> <out>");
73       System.exit(2);
74     }
75     Job job = new Job(conf, "word count");
76     job.setJarByClass(WordCount.class);
77     job.setMapperClass(TokenizerMapper.class);
78     job.setCombinerClass(IntSumReducer.class);
79     job.setReducerClass(IntSumReducer.class);
80     job.setOutputKeyClass(Text.class);
81     job.setOutputValueClass(IntWritable.class);
82     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
83     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
84     System.exit(job.waitForCompletion(true) ? 0 : 1);
85   }
86 }
View Code

相关文章:

  • 2021-05-21
  • 2022-12-23
  • 2021-06-06
  • 2021-10-14
  • 2021-04-24
  • 2021-12-09
  • 2021-04-01
  • 2021-11-12
猜你喜欢
  • 2021-10-11
  • 2021-06-05
  • 2021-04-05
  • 2022-01-31
  • 2021-09-25
  • 2022-12-23
  • 2021-11-12
相关资源
相似解决方案