- 分组:相同key的value进行分组
例子:如下输入输出,右边的第一列没有重复值,第二列取得是当第一列相同时第二例取最大值
分析:首先确定<k3,v3>,k3的选择两种方式,
方法1.前两列都作为k3
方法2.两列分别是k3和v3,此种情况的k2和v2分别是那些,第一列为k2,第二列为v2,但是最后如何无法转化为k3,v3呢,思路是从v2s中取值最大的,此种情况不能取值。
第一部分:方法二达到任务目的
(1)自定义Mapper
1 private static class MyMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable>{ 2 IntWritable k2= new IntWritable(); 3 IntWritable v2= new IntWritable(); 4 @Override 5 protected void map(LongWritable key, Text value, 6 Mapper<LongWritable, Text, IntWritable, IntWritable>.Context context) 7 throws IOException, InterruptedException { 8 String[] splited = value.toString().split("\t"); 9 k2.set(Integer.parseInt(splited[0])); 10 v2.set(Integer.parseInt(splited[1])); 11 context.write(k2, v2); 12 } 13 }
(2)自定义Reduce
//按照k2進行排序,分組(分为3各组,reduce函数被调用3次,分几组调用几次)
//分组为3-{3,2,1}, 2-{2,1},1-{1}
1 private static class MyReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{ 2 IntWritable v3 = new IntWritable(); 3 @Override 4 protected void reduce(IntWritable k2, Iterable<IntWritable> v2s, 5 Reducer<IntWritable, IntWritable, IntWritable, IntWritable>.Context context) 6 throws IOException, InterruptedException { 7 int max=Integer.MIN_VALUE; 8 for (IntWritable v2 : v2s) { 9 if (v2.get()>max) { 10 max=v2.get(); 11 } 12 } 13 //每个组求得一个最大值可得到结果的序列 14 v3.set(max); 15 context.write(k2, v3); 16 } 17 }
(3)组合MapReduce
1 public static void main(String[] args) throws Exception { 2 Job job = Job.getInstance(new Configuration(), GroupTest.class.getSimpleName()); 3 job.setJarByClass(GroupTest.class); 4 //1.自定义输入路径 5 FileInputFormat.setInputPaths(job, new Path(args[0])); 6 //2.自定义mapper 7 //job.setInputFormatClass(TextInputFormat.class); 8 job.setMapperClass(MyMapper.class); 9 //job.setMapOutputKeyClass(Text.class); 10 //job.setMapOutputValueClass(TrafficWritable.class); 11 12 //3.自定义reduce 13 job.setReducerClass(MyReducer.class); 14 job.setOutputKeyClass(IntWritable.class); 15 job.setOutputValueClass(IntWritable.class); 16 //4.自定义输出路径 17 FileOutputFormat.setOutputPath(job, new Path(args[1])); 18 //job.setOutputFormatClass(TextOutputFormat.class);//对输出的数据格式化并写入磁盘 19 20 job.waitForCompletion(true); 21 }
由此,完整的代码如下:
1 package Mapreduce; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.IntWritable; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.Reducer; 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 16 public class GroupTest { 17 public static void main(String[] args) throws Exception { 18 Job job = Job.getInstance(new Configuration(), GroupTest.class.getSimpleName()); 19 job.setJarByClass(GroupTest.class); 20 //1.自定义输入路径 21 FileInputFormat.setInputPaths(job, new Path(args[0])); 22 //2.自定义mapper 23 //job.setInputFormatClass(TextInputFormat.class); 24 job.setMapperClass(MyMapper.class); 25 //job.setMapOutputKeyClass(Text.class); 26 //job.setMapOutputValueClass(TrafficWritable.class); 27 28 //3.自定义reduce 29 job.setReducerClass(MyReducer.class); 30 job.setOutputKeyClass(IntWritable.class); 31 job.setOutputValueClass(IntWritable.class); 32 //4.自定义输出路径 33 FileOutputFormat.setOutputPath(job, new Path(args[1])); 34 //job.setOutputFormatClass(TextOutputFormat.class);//对输出的数据格式化并写入磁盘 35 36 job.waitForCompletion(true); 37 } 38 private static class MyMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable>{ 39 IntWritable k2= new IntWritable(); 40 IntWritable v2= new IntWritable(); 41 @Override 42 protected void map(LongWritable key, Text value, 43 Mapper<LongWritable, Text, IntWritable, IntWritable>.Context context) 44 throws IOException, InterruptedException { 45 String[] splited = value.toString().split("\t"); 46 k2.set(Integer.parseInt(splited[0])); 47 v2.set(Integer.parseInt(splited[1])); 48 context.write(k2, v2); 49 } 50 } 51 //按照k2進行排序,分組(分为3各组,reduce函数被调用3次,分几组调用几次) 52 //分组为3-{3,2,1}, 2-{2,1},1-{1} 53 private static class MyReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{ 54 IntWritable v3 = new IntWritable(); 55 @Override 56 protected void reduce(IntWritable k2, Iterable<IntWritable> v2s, 57 Reducer<IntWritable, IntWritable, IntWritable, IntWritable>.Context context) 58 throws IOException, InterruptedException { 59 int max=Integer.MIN_VALUE; 60 for (IntWritable v2 : v2s) { 61 if (v2.get()>max) { 62 max=v2.get(); 63 } 64 } 65 //每个组求得一个最大值可得到结果的序列 66 v3.set(max); 67 context.write(k2, v3); 68 } 69 } 70 }