1.1 什么是Hadoop计数器
Haoop是处理大数据的,不适合处理小数据,有些大数据问题是小数据程序是处理不了的,他是一个高延迟的任务,有时处理一个大数据需要花费好几个小时这都是正常的。下面我们说一下Hadoop计数器,Hadoop计数器就相当于我们的日志,而日志可以让我们查看程序运行时的很多状态,而计数器也有这方面的作用。那么就研究一下Hadoop自身的计数器。计数器的程序如代码1.1所示,下面代码还是以内容为“hello you;hell0 me”的单词统计为例。
1 package counter; 2 3 import java.net.URI; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.FileSystem; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Counter; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.Mapper; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 19 20 public class WordCountApp { 21 static final String INPUT_PATH = "hdfs://hadoop:9000/input"; 22 static final String OUT_PATH = "hdfs://hadoop:9000/output"; 23 24 public static void main(String[] args) throws Exception { 25 26 Configuration conf = new Configuration(); 27 28 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); 29 final Path outPath = new Path(OUT_PATH); 30 31 if(fileSystem.exists(outPath)){ 32 fileSystem.delete(outPath, true); 33 } 34 final Job job = new Job(conf , WordCountApp.class.getSimpleName()); 35 36 //1.1指定读取的文件位于哪里 37 FileInputFormat.setInputPaths(job, INPUT_PATH); 38 job.setInputFormatClass(TextInputFormat.class);//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对 39 40 //1.2 指定自定义的map类 41 job.setMapperClass(MyMapper.class); 42 job.setMapOutputKeyClass(Text.class);//map输出的<k,v>类型。 43 job.setMapOutputValueClass(LongWritable.class);//如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略 44 45 //1.3 分区 46 job.setPartitionerClass(HashPartitioner.class); 47 job.setNumReduceTasks(1);//有一个reduce任务运行 48 49 //2.2 指定自定义reduce类 50 job.setReducerClass(MyReducer.class); 51 52 job.setOutputKeyClass(Text.class);//指定reduce的输出类型 53 job.setOutputValueClass(LongWritable.class); 54 55 //2.3 指定写出到哪里 56 FileOutputFormat.setOutputPath(job, outPath); 57 job.setOutputFormatClass(TextOutputFormat.class);//指定输出文件的格式化类 58 59 job.waitForCompletion(true);//把job提交给JobTracker运行 60 } 61 62 /** 63 * KEYIN 即k1 表示行的偏移量 64 * VALUEIN 即v1 表示行文本内容 65 * KEYOUT 即k2 表示行中出现的单词 66 * VALUEOUT 即v2 表示行中出现的单词的次数,固定值1 67 */ 68 static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ 69 protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException { 70 final String line = v1.toString(); 71 final String[] splited = line.split("\t"); 72 for (String word : splited) { 73 context.write(new Text(word), new LongWritable(1)); 74 } 75 }; 76 } 77 78 /** 79 * KEYIN 即k2 表示行中出现的单词 80 * VALUEIN 即v2 表示行中出现的单词的次数 81 * KEYOUT 即k3 表示文本中出现的不同单词 82 * VALUEOUT 即v3 表示文本中出现的不同单词的总次数 83 * 84 */ 85 static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ 86 protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException { 87 long times = 0L; 88 for (LongWritable count : v2s) { 89 times += count.get(); 90 } 91 ctx.write(k2, new LongWritable(times)); 92 }; 93 } 94 95 }