1.WordCount(统计单词)
经典的运用MapReuce编程模型的实例
1.1 Description
给定一系列的单词/数据,输出每个单词/数据的数量
1.2 Sample
1 a is b is not c 2 b is a is not d
1.3 Output
1 a:2 2 b:2 3 c:1 4 d:1 5 is:4 6 not:2
1.4 Solution
1 /** 2 * Licensed under the Apache License, Version 2.0 (the "License"); 3 * you may not use this file except in compliance with the License. 4 * You may obtain a copy of the License at 5 * 6 * http://www.apache.org/licenses/LICENSE-2.0 7 * 8 * Unless required by applicable law or agreed to in writing, software 9 * distributed under the License is distributed on an "AS IS" BASIS, 10 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 * See the License for the specific language governing permissions and 12 * limitations under the License. 13 */ 14 15 package org.apache.hadoop.examples; 16 17 import java.io.File; 18 import java.io.IOException; 19 import java.util.StringTokenizer; 20 21 import org.apache.hadoop.conf.Configuration; 22 import org.apache.hadoop.fs.Path; 23 import org.apache.hadoop.io.IntWritable; 24 import org.apache.hadoop.io.Text; 25 import org.apache.hadoop.mapreduce.Job; 26 import org.apache.hadoop.mapreduce.Mapper; 27 import org.apache.hadoop.mapreduce.Reducer; 28 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 29 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 30 import org.apache.hadoop.util.GenericOptionsParser; 31 32 public class WordCount { 33 34 //map输出的<key,value>为<输入的单词/数据,1>即<Text,IntWritable> 35 public static class TokenizerMapper 36 extends Mapper<Object, Text, Text, IntWritable>{ 37 //value为封装好的int即IntWritable 38 private final static IntWritable one = new IntWritable(1); 39 private Text word = new Text(); 40 41 public void map(Object key, Text value, Context context 42 ) throws IOException, InterruptedException { 43 StringTokenizer itr = new StringTokenizer(value.toString()); 44 while (itr.hasMoreTokens()) { 45 word.set(itr.nextToken());//word为每个单词/数据,以空格为分隔符识别 46 context.write(word, one); 47 } 48 } 49 } 50 51 //reduce输入的<key,value>为<输入的单词/数据,各个值的1相加即sum(实际是一个list)> 52 //即<Text,IntWrite> 53 public static class IntSumReducer 54 extends Reducer<Text,IntWritable,Text,IntWritable> { 55 private IntWritable result = new IntWritable(); 56 57 public void reduce(Text key, Iterable<IntWritable> values, 58 Context context 59 ) throws IOException, InterruptedException { 60 int sum = 0; 61 for (IntWritable val : values) { 62 sum += val.get(); 63 } 64 result.set(sum); 65 context.write(key, result); 66 } 67 } 68 69 public static void main(String[] args) throws Exception { 70 Configuration conf = new Configuration(); 71 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 72 if (otherArgs.length != 2) { 73 System.err.println("Usage: wordcount <in> <out>"); 74 System.exit(2); 75 } 76 //删除已存在的输出文件夹 77 judgeFileExist(otherArgs[1]); 78 Job job = new Job(conf, "word count"); 79 job.setJarByClass(WordCount.class); 80 job.setMapperClass(TokenizerMapper.class); 81 job.setCombinerClass(IntSumReducer.class); 82 job.setReducerClass(IntSumReducer.class); 83 job.setOutputKeyClass(Text.class); 84 job.setOutputValueClass(IntWritable.class); 85 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 86 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 87 System.exit(job.waitForCompletion(true) ? 0 : 1); 88 } 89 90 //删除文件夹及其目录下的文件 91 public static void judgeFileExist(String path){ 92 File file = new File(path); 93 if( file.exists() ){ 94 deleteFileDir(file); 95 } 96 } 97 98 public static void deleteFileDir(File path){ 99 if( path.isDirectory() ){ 100 String[] files = path.list(); 101 for( int i=0;i<files.length;i++ ){ 102 deleteFileDir( new File(path,files[i]) ); 103 } 104 } 105 path.delete(); 106 } 107 108 }