1.1 单词统计
回顾我们以前单词统计的例子,如代码1.1所示。
1 package counter; 2 3 import java.net.URI; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.FileSystem; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Counter; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.Mapper; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 19 20 public class WordCountApp { 21 static final String INPUT_PATH = "hdfs://hadoop:9000/hello"; 22 static final String OUT_PATH = "hdfs://hadoop:9000/out"; 23 24 public static void main(String[] args) throws Exception { 25 26 Configuration conf = new Configuration(); 27 28 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); 29 final Path outPath = new Path(OUT_PATH); 30 31 if(fileSystem.exists(outPath)){ 32 fileSystem.delete(outPath, true); 33 } 34 35 final Job job = new Job(conf , WordCountApp.class.getSimpleName()); 36 37 FileInputFormat.setInputPaths(job, INPUT_PATH);//1.1指定读取的文件位于哪里 38 39 job.setInputFormatClass(TextInputFormat.class);//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对 40 41 job.setMapperClass(MyMapper.class);//1.2 指定自定义的map类 42 job.setMapOutputKeyClass(Text.class);//map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略 43 job.setMapOutputValueClass(LongWritable.class); 44 45 job.setPartitionerClass(HashPartitioner.class);//1.3 分区 46 job.setNumReduceTasks(1);//有一个reduce任务运行 47 48 job.setReducerClass(MyReducer.class);//2.2 指定自定义reduce类 49 job.setOutputKeyClass(Text.class);//指定reduce的输出类型 50 job.setOutputValueClass(LongWritable.class); 51 52 FileOutputFormat.setOutputPath(job, outPath);//2.3 指定写出到哪里 53 54 job.setOutputFormatClass(TextOutputFormat.class);//指定输出文件的格式化类 55 56 job.waitForCompletion(true);//把job提交给JobTracker运行 57 } 58 59 /** 60 * KEYIN 即k1 表示行的偏移量 61 * VALUEIN 即v1 表示行文本内容 62 * KEYOUT 即k2 表示行中出现的单词 63 * VALUEOUT 即v2 表示行中出现的单词的次数,固定值1 64 */ 65 static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ 66 protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException { 67 // final Counter helloCounter = context.getCounter("Sensitive Words", "hello"); 68 69 final String line = v1.toString(); 70 /* if(line.contains("hello")){ 71 //记录敏感词出现在一行中 72 helloCounter.increment(1L); 73 }*/ 74 final String[] splited = line.split(" "); 75 for (String word : splited) { 76 context.write(new Text(word), new LongWritable(1)); 77 } 78 }; 79 } 80 81 /** 82 * KEYIN 即k2 表示行中出现的单词 83 * VALUEIN 即v2 表示行中出现的单词的次数 84 * KEYOUT 即k3 表示文本中出现的不同单词 85 * VALUEOUT 即v3 表示文本中出现的不同单词的总次数 86 * 87 */ 88 static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ 89 protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException { 90 long times = 0L; 91 for (LongWritable count : v2s) { 92 times += count.get(); 93 } 94 ctx.write(k2, new LongWritable(times)); 95 }; 96 } 97 98 }