本节所用到的数据下载地址为:http://pan.baidu.com/s/1bnfELmZ

  我们知道排序分组是MapReduce中Mapper端的第四步,其中分组排序都是基于Key的,我们可以通过下面这几个例子来体现出来。其中的数据和任务如下图1.1,1.2所示。

#首先按照第一列升序排列,当第一列相同时,第二列升序排列
3    3
3    2
3    1
2    2
2    1
1    1
-------------------
#结果
1    1
2    1
2    2
3    1
3    2
3    3

图 1.1 排序

#当第一列相同时,求出第二列的最小值
3    3
3    2
3    1
2    2
2    1
1    1
-------------------
#结果
3    1
2    1
1    1

图 1.2 分组

一、 排序算法

1.1 MapReduce默认排序算法

  使用MapReduce默认排序算法代码如下1.1所示,在代码中我将第一列作为键,第二列作为值。

 1 package sort;
 2 
 3 import java.io.IOException;
 4 import java.net.URI;
 5 
 6 import org.apache.hadoop.conf.Configuration;
 7 import org.apache.hadoop.fs.FileStatus;
 8 import org.apache.hadoop.fs.FileSystem;
 9 import org.apache.hadoop.fs.Path;
10 import org.apache.hadoop.io.LongWritable;
11 import org.apache.hadoop.io.Text;
12 import org.apache.hadoop.mapreduce.Job;
13 import org.apache.hadoop.mapreduce.Mapper;
14 import org.apache.hadoop.mapreduce.Reducer;
15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
19 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
20 
21 public class SortApp {
22     private static final String INPUT_PATH = "hdfs://hadoop:9000/newinput";
23     private static final String OUT_PATH = "hdfs://hadoop:9000/newoutput";
24     public static void main(String[] args) throws Exception {
25         Configuration conf=new Configuration();
26         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
27         final Path outpath = new Path(OUT_PATH);
28         if(fileSystem.exists(outpath)){
29             fileSystem.delete(outpath,true);
30         }
31         
32         final Job job = new Job(conf,SortApp.class.getSimpleName());
33         
34         //1.1 指定输入文件路径
35         FileInputFormat.setInputPaths(job, INPUT_PATH);        
36         job.setInputFormatClass(TextInputFormat.class);//指定哪个类用来格式化输入文件
37                 
38         //1.2指定自定义的Mapper类
39         job.setMapperClass(MyMapper.class);        
40         job.setMapOutputKeyClass(LongWritable.class);//指定输出<k2,v2>的类型
41         job.setMapOutputValueClass(LongWritable.class);
42                 
43         //1.3 指定分区类
44         job.setPartitionerClass(HashPartitioner.class);
45         job.setNumReduceTasks(1);
46                 
47         //1.4 TODO 排序、分区
48                 
49         //1.5  TODO (可选)合并
50                 
51         //2.2 指定自定义的reduce类
52         job.setReducerClass(MyReducer.class);        
53         job.setOutputKeyClass(LongWritable.class);//指定输出<k3,v3>的类型
54         job.setOutputValueClass(LongWritable.class);
55                 
56         //2.3 指定输出到哪里
57         FileOutputFormat.setOutputPath(job, outpath);        
58         job.setOutputFormatClass(TextOutputFormat.class);//设定输出文件的格式化类                        
59         job.waitForCompletion(true);//把代码提交给JobTracker执行        
60     }
61     static class MyMapper extends Mapper<LongWritable, Text,LongWritable,LongWritable>{
62 
63         @Override
64         protected void map(
65                 LongWritable key,
66                 Text value,
67                 Mapper<LongWritable, Text, LongWritable, LongWritable>.Context context)
68                 throws IOException, InterruptedException {
69             final String[] splited = value.toString().split("\t");
70             final long k2 = Long.parseLong(splited[0]);
71             final long v2 = Long.parseLong(splited[1]);
72             context.write(new LongWritable(k2),new LongWritable(v2));
73         }    
74     }
75     static class MyReducer extends Reducer<LongWritable,LongWritable,LongWritable,LongWritable>{
76 
77         @Override
78         protected void reduce(
79                 LongWritable k2,
80                 Iterable<LongWritable> v2s,
81                 Reducer<LongWritable, LongWritable, LongWritable, LongWritable>.Context context)
82                 throws IOException, InterruptedException {
83             for(LongWritable v2:v2s){
84                 context.write(k2, v2);
85             }            
86         }    
87     }
88 }
View Code

相关文章:

  • 2022-03-02
  • 2022-12-23
  • 2022-12-23
  • 2021-08-11
  • 2022-12-23
  • 2022-12-23
  • 2021-10-30
  • 2022-12-23
猜你喜欢
  • 2022-12-23
  • 2021-07-20
  • 2022-01-02
  • 2021-07-27
  • 2021-11-20
  • 2021-04-10
  • 2022-02-26
相关资源
相似解决方案