本节所用到的数据下载地址为:http://pan.baidu.com/s/1bnfELmZ
我们知道排序分组是MapReduce中Mapper端的第四步,其中分组排序都是基于Key的,我们可以通过下面这几个例子来体现出来。其中的数据和任务如下图1.1,1.2所示。
#首先按照第一列升序排列,当第一列相同时,第二列升序排列 3 3 3 2 3 1 2 2 2 1 1 1 ------------------- #结果 1 1 2 1 2 2 3 1 3 2 3 3
图 1.1 排序
#当第一列相同时,求出第二列的最小值 3 3 3 2 3 1 2 2 2 1 1 1 ------------------- #结果 3 1 2 1 1 1
图 1.2 分组
一、 排序算法
1.1 MapReduce默认排序算法
使用MapReduce默认排序算法代码如下1.1所示,在代码中我将第一列作为键,第二列作为值。
1 package sort; 2 3 import java.io.IOException; 4 import java.net.URI; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.fs.FileStatus; 8 import org.apache.hadoop.fs.FileSystem; 9 import org.apache.hadoop.fs.Path; 10 import org.apache.hadoop.io.LongWritable; 11 import org.apache.hadoop.io.Text; 12 import org.apache.hadoop.mapreduce.Job; 13 import org.apache.hadoop.mapreduce.Mapper; 14 import org.apache.hadoop.mapreduce.Reducer; 15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 19 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 20 21 public class SortApp { 22 private static final String INPUT_PATH = "hdfs://hadoop:9000/newinput"; 23 private static final String OUT_PATH = "hdfs://hadoop:9000/newoutput"; 24 public static void main(String[] args) throws Exception { 25 Configuration conf=new Configuration(); 26 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); 27 final Path outpath = new Path(OUT_PATH); 28 if(fileSystem.exists(outpath)){ 29 fileSystem.delete(outpath,true); 30 } 31 32 final Job job = new Job(conf,SortApp.class.getSimpleName()); 33 34 //1.1 指定输入文件路径 35 FileInputFormat.setInputPaths(job, INPUT_PATH); 36 job.setInputFormatClass(TextInputFormat.class);//指定哪个类用来格式化输入文件 37 38 //1.2指定自定义的Mapper类 39 job.setMapperClass(MyMapper.class); 40 job.setMapOutputKeyClass(LongWritable.class);//指定输出<k2,v2>的类型 41 job.setMapOutputValueClass(LongWritable.class); 42 43 //1.3 指定分区类 44 job.setPartitionerClass(HashPartitioner.class); 45 job.setNumReduceTasks(1); 46 47 //1.4 TODO 排序、分区 48 49 //1.5 TODO (可选)合并 50 51 //2.2 指定自定义的reduce类 52 job.setReducerClass(MyReducer.class); 53 job.setOutputKeyClass(LongWritable.class);//指定输出<k3,v3>的类型 54 job.setOutputValueClass(LongWritable.class); 55 56 //2.3 指定输出到哪里 57 FileOutputFormat.setOutputPath(job, outpath); 58 job.setOutputFormatClass(TextOutputFormat.class);//设定输出文件的格式化类 59 job.waitForCompletion(true);//把代码提交给JobTracker执行 60 } 61 static class MyMapper extends Mapper<LongWritable, Text,LongWritable,LongWritable>{ 62 63 @Override 64 protected void map( 65 LongWritable key, 66 Text value, 67 Mapper<LongWritable, Text, LongWritable, LongWritable>.Context context) 68 throws IOException, InterruptedException { 69 final String[] splited = value.toString().split("\t"); 70 final long k2 = Long.parseLong(splited[0]); 71 final long v2 = Long.parseLong(splited[1]); 72 context.write(new LongWritable(k2),new LongWritable(v2)); 73 } 74 } 75 static class MyReducer extends Reducer<LongWritable,LongWritable,LongWritable,LongWritable>{ 76 77 @Override 78 protected void reduce( 79 LongWritable k2, 80 Iterable<LongWritable> v2s, 81 Reducer<LongWritable, LongWritable, LongWritable, LongWritable>.Context context) 82 throws IOException, InterruptedException { 83 for(LongWritable v2:v2s){ 84 context.write(k2, v2); 85 } 86 } 87 } 88 }