1.1 单词统计

  回顾我们以前单词统计的例子,如代码1.1所示。

 1 package counter;
 2 
 3 import java.net.URI;
 4 
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.fs.FileSystem;
 7 import org.apache.hadoop.fs.Path;
 8 import org.apache.hadoop.io.LongWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Counter;
11 import org.apache.hadoop.mapreduce.Job;
12 import org.apache.hadoop.mapreduce.Mapper;
13 import org.apache.hadoop.mapreduce.Reducer;
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
19 
20 public class WordCountApp {
21     static final String INPUT_PATH = "hdfs://hadoop:9000/hello";
22     static final String OUT_PATH = "hdfs://hadoop:9000/out";
23     
24     public static void main(String[] args) throws Exception {
25         
26         Configuration conf = new Configuration();
27         
28         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
29         final Path outPath = new Path(OUT_PATH);
30         
31         if(fileSystem.exists(outPath)){
32             fileSystem.delete(outPath, true);
33         }
34         
35         final Job job = new Job(conf , WordCountApp.class.getSimpleName());
36         
37         FileInputFormat.setInputPaths(job, INPUT_PATH);//1.1指定读取的文件位于哪里
38         
39         job.setInputFormatClass(TextInputFormat.class);//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
40         
41         job.setMapperClass(MyMapper.class);//1.2 指定自定义的map类
42         job.setMapOutputKeyClass(Text.class);//map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略
43         job.setMapOutputValueClass(LongWritable.class);
44         
45         job.setPartitionerClass(HashPartitioner.class);//1.3 分区
46         job.setNumReduceTasks(1);//有一个reduce任务运行
47         
48         job.setReducerClass(MyReducer.class);//2.2 指定自定义reduce类
49         job.setOutputKeyClass(Text.class);//指定reduce的输出类型
50         job.setOutputValueClass(LongWritable.class);
51         
52         FileOutputFormat.setOutputPath(job, outPath);//2.3 指定写出到哪里
53         
54         job.setOutputFormatClass(TextOutputFormat.class);//指定输出文件的格式化类
55         
56         job.waitForCompletion(true);//把job提交给JobTracker运行
57     }
58     
59     /**
60      * KEYIN    即k1        表示行的偏移量
61      * VALUEIN    即v1        表示行文本内容
62      * KEYOUT    即k2        表示行中出现的单词
63      * VALUEOUT    即v2        表示行中出现的单词的次数,固定值1
64      */
65     static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
66         protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
67         //    final Counter helloCounter = context.getCounter("Sensitive Words", "hello");
68             
69             final String line = v1.toString();
70         /*    if(line.contains("hello")){
71                 //记录敏感词出现在一行中
72                 helloCounter.increment(1L);
73             }*/
74             final String[] splited = line.split(" ");
75             for (String word : splited) {
76                 context.write(new Text(word), new LongWritable(1));
77             }
78         };
79     }
80     
81     /**
82      * KEYIN    即k2        表示行中出现的单词
83      * VALUEIN    即v2        表示行中出现的单词的次数
84      * KEYOUT    即k3        表示文本中出现的不同单词
85      * VALUEOUT    即v3        表示文本中出现的不同单词的总次数
86      *
87      */
88     static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
89         protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
90             long times = 0L;
91             for (LongWritable count : v2s) {
92                 times += count.get();
93             }
94             ctx.write(k2, new LongWritable(times));
95         };
96     }
97         
98 }
View Code

相关文章: