•  分组:相同key的value进行分组

 例子:如下输入输出,右边的第一列没有重复值,第二列取得是当第一列相同时第二例取最大值

          MapReduce分组

分析:首先确定<k3,v3>,k3的选择两种方式,

方法1.前两列都作为k3

方法2.两列分别是k3和v3,此种情况的k2和v2分别是那些,第一列为k2,第二列为v2,但是最后如何无法转化为k3,v3呢,思路是从v2s中取值最大的,此种情况不能取值。

第一部分:方法二达到任务目的

(1)自定义Mapper

 1 private static class MyMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable>{
 2     IntWritable k2= new IntWritable();
 3     IntWritable v2= new IntWritable();
 4     @Override
 5     protected void map(LongWritable key, Text value,
 6             Mapper<LongWritable, Text, IntWritable, IntWritable>.Context context)
 7             throws IOException, InterruptedException {
 8            String[] splited = value.toString().split("\t");
 9            k2.set(Integer.parseInt(splited[0]));
10            v2.set(Integer.parseInt(splited[1]));
11            context.write(k2, v2);
12     }
13 }

(2)自定义Reduce

//按照k2進行排序,分組(分为3各组,reduce函数被调用3次,分几组调用几次)
//分组为3-{3,2,1}, 2-{2,1},1-{1}

 1 private static class MyReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{
 2     IntWritable v3 = new IntWritable();
 3     @Override
 4     protected void reduce(IntWritable k2, Iterable<IntWritable> v2s,
 5             Reducer<IntWritable, IntWritable, IntWritable, IntWritable>.Context context)
 6             throws IOException, InterruptedException {
 7         int max=Integer.MIN_VALUE;
 8         for (IntWritable v2 : v2s) {
 9             if (v2.get()>max) {
10                 max=v2.get();
11             }
12         }
13         //每个组求得一个最大值可得到结果的序列
14         v3.set(max);
15         context.write(k2, v3);
16     }
17 }

(3)组合MapReduce

 1 public static void main(String[] args) throws Exception {
 2     Job job = Job.getInstance(new Configuration(), GroupTest.class.getSimpleName());
 3     job.setJarByClass(GroupTest.class);
 4     //1.自定义输入路径
 5     FileInputFormat.setInputPaths(job, new Path(args[0]));
 6     //2.自定义mapper
 7     //job.setInputFormatClass(TextInputFormat.class);
 8     job.setMapperClass(MyMapper.class);
 9     //job.setMapOutputKeyClass(Text.class);
10     //job.setMapOutputValueClass(TrafficWritable.class);
11     
12     //3.自定义reduce
13     job.setReducerClass(MyReducer.class);
14     job.setOutputKeyClass(IntWritable.class);
15     job.setOutputValueClass(IntWritable.class);
16     //4.自定义输出路径
17     FileOutputFormat.setOutputPath(job, new Path(args[1]));
18     //job.setOutputFormatClass(TextOutputFormat.class);//对输出的数据格式化并写入磁盘
19     
20     job.waitForCompletion(true);
21 }

由此,完整的代码如下:

 1 package Mapreduce;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.fs.Path;
 7 import org.apache.hadoop.io.IntWritable;
 8 import org.apache.hadoop.io.LongWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Job;
11 import org.apache.hadoop.mapreduce.Mapper;
12 import org.apache.hadoop.mapreduce.Reducer;
13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
15 
16 public class GroupTest {
17 public static void main(String[] args) throws Exception {
18     Job job = Job.getInstance(new Configuration(), GroupTest.class.getSimpleName());
19     job.setJarByClass(GroupTest.class);
20     //1.自定义输入路径
21     FileInputFormat.setInputPaths(job, new Path(args[0]));
22     //2.自定义mapper
23     //job.setInputFormatClass(TextInputFormat.class);
24     job.setMapperClass(MyMapper.class);
25     //job.setMapOutputKeyClass(Text.class);
26     //job.setMapOutputValueClass(TrafficWritable.class);
27     
28     //3.自定义reduce
29     job.setReducerClass(MyReducer.class);
30     job.setOutputKeyClass(IntWritable.class);
31     job.setOutputValueClass(IntWritable.class);
32     //4.自定义输出路径
33     FileOutputFormat.setOutputPath(job, new Path(args[1]));
34     //job.setOutputFormatClass(TextOutputFormat.class);//对输出的数据格式化并写入磁盘
35     
36     job.waitForCompletion(true);
37 }
38 private static class MyMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable>{
39     IntWritable k2= new IntWritable();
40     IntWritable v2= new IntWritable();
41     @Override
42     protected void map(LongWritable key, Text value,
43             Mapper<LongWritable, Text, IntWritable, IntWritable>.Context context)
44             throws IOException, InterruptedException {
45            String[] splited = value.toString().split("\t");
46            k2.set(Integer.parseInt(splited[0]));
47            v2.set(Integer.parseInt(splited[1]));
48            context.write(k2, v2);
49     }
50 }
51 //按照k2進行排序,分組(分为3各组,reduce函数被调用3次,分几组调用几次)
52 //分组为3-{3,2,1}, 2-{2,1},1-{1}
53 private static class MyReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{
54     IntWritable v3 = new IntWritable();
55     @Override
56     protected void reduce(IntWritable k2, Iterable<IntWritable> v2s,
57             Reducer<IntWritable, IntWritable, IntWritable, IntWritable>.Context context)
58             throws IOException, InterruptedException {
59         int max=Integer.MIN_VALUE;
60         for (IntWritable v2 : v2s) {
61             if (v2.get()>max) {
62                 max=v2.get();
63             }
64         }
65         //每个组求得一个最大值可得到结果的序列
66         v3.set(max);
67         context.write(k2, v3);
68     }
69 }
70 }
最值得MapReduce代码

相关文章:

  • 2022-12-23
  • 2022-12-23
  • 2021-11-29
  • 2021-08-19
  • 2022-12-23
  • 2022-02-19
  • 2021-11-21
  • 2022-02-01
猜你喜欢
  • 2021-12-23
  • 2022-12-23
  • 2022-02-02
  • 2021-11-03
  • 2022-12-23
  • 2021-10-27
  • 2021-04-08
相关资源
相似解决方案