1 省名、白天降雨量、晚上降雨量
2 广东省 262 513
3 黑龙江省 272 542
4 辽宁省 599 381
5 贵州省 426 267
6 陕西省 189 256
7 河北省 465 250
8 广东省 121 129
9 湖南省 127 491
10 湖南省 395 373
11 甘肃省 416 255
12 四川省 141 292
13 云南省 560 453
14 江苏省 234 229
15 江苏省 491 401
16 湖北省 556 490
17 江苏省 224 212
18 安徽省 126 488
19 四川省 104 122
20 陕西省 510 482
21 山东省 138 300
22 山西省 325 183
23 海南省 113 276
24 黑龙江省 108 570
25 黑龙江省 562 107
26 黑龙江省 269 201
27 江西省 118 474
28 湖北省 365 200
29 云南省 524 296
30 湖北省 509 515
31 山东省 584 523
32 江苏省 178 581
33 浙江省 588 264
34 江西省 261 374
35 黑龙江省 127 590
36 浙江省 366 574
37 云南省 287 479
38 海南省 501 595
39 吉林省 196 318
40 辽宁省 584 589
41 湖北省 337 433
42 河南省 112 338
43 广东省 241 561
44 甘肃省 113 342
45 山西省 283 585
46 山东省 275 563
47 青海省 170 172
48 江西省 327 403
49 湖南省 427 563
50 江西省 112 389
51 云南省 200 316
52 福建省 188 230
53 山西省 222 189
54 福建省 306 540
55 山西省 530 404
56 黑龙江省 176 406
57 陕西省 334 237
58 云南省 231 379
59 海南省 453 292
60 吉林省 481 404
61 广东省 598 295
62 吉林省 424 151
63 青海省 428 527
64 湖南省 364 517
65 湖南省 400 473
66 福建省 485 540
67 江苏省 160 395
68 湖北省 289 336
69 贵州省 317 197
70 云南省 184 574
71 广东省 289 348
72 山西省 137 535
73 甘肃省 112 357
74 云南省 205 595
75 贵州省 564 239
76 湖南省 313 148
77 河北省 366 194
78 辽宁省 378 336
79 云南省 510 532
80 甘肃省 371 216
81 青海省 242 172
82 四川省 353 342
83 江苏省 410 110
84 吉林省 272 245
85 江苏省 409 426
86 安徽省 567 388
87 四川省 358 501
88 辽宁省 576 216
89 四川省 384 580
90 甘肃省 282 326
91 湖北省 450 133
92 四川省 167 415
93 河南省 466 144
94 山东省 182 505
95 江西省 243 411
96 甘肃省 157 262
97 河南省 401 418
98 山西省 294 214
99 贵州省 116 413
100 河南省 462 207
101 山东省 377 519
102 1)编写MapReduce完成降雨量求和。其中Mapper和Reducer(8分),Runner(2分)
103 2)编写MapReduce完成省降雨量倒序排序。其其中Mapper和Reducer(8分),Runner(2分)
104 3)将任务打包到Linux中运行,并查看运行结果。(4分)
1 package com.liuhuan;
2
3 import org.apache.hadoop.io.WritableComparable;
4
5 import java.io.DataInput;
6 import java.io.DataOutput;
7 import java.io.IOException;
8
9 public class RainBean implements WritableComparable<RainBean> {
10
11 private String city;
12 private int num;
13
14 @Override
15 public int compareTo(RainBean o) {
16 return o.num - this.num;
17 }
18
19 @Override
20 public void write(DataOutput dataOutput) throws IOException {
21 dataOutput.writeUTF(city);
22 dataOutput.writeInt(num);
23 }
24
25 @Override
26 public void readFields(DataInput dataInput) throws IOException {
27 this.city = dataInput.readUTF();
28 this.num = dataInput.readInt();
29 }
30
31 @Override
32 public String toString() {
33 return city + "\t" + num;
34 }
35
36 public String getCity() {
37 return city;
38 }
39
40 public void setCity(String city) {
41 this.city = city;
42 }
43
44 public int getNum() {
45 return num;
46 }
47
48 public void setNum(int num) {
49 this.num = num;
50 }
51 }
1 package com.liuhuan;
2
3 import org.apache.hadoop.io.IntWritable;
4 import org.apache.hadoop.io.LongWritable;
5 import org.apache.hadoop.io.Text;
6 import org.apache.hadoop.mapreduce.Mapper;
7
8 import java.io.IOException;
9
10 public class RainMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
11 Text text = new Text();
12 IntWritable sum = new IntWritable();
13
14 @Override
15 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
16 int num = 0;
17 String[] split = value.toString().split("\t");
18 text.set(split[0]);
19 num += Integer.parseInt(split[1]) + Integer.parseInt(split[2]);
20 sum.set(num);
21 context.write(text,sum);
22 }
23 }
1 package com.liuhuan;
2
3 import org.apache.hadoop.io.IntWritable;
4 import org.apache.hadoop.io.Text;
5 import org.apache.hadoop.mapreduce.Reducer;
6
7 import java.io.IOException;
8
9 public class RainReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
10 IntWritable sum = new IntWritable();
11
12 @Override
13 protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
14 int num = 0;
15 for (IntWritable value : values) {
16 num += value.get();
17 }
18 sum.set(num);
19 context.write(key,sum);
20 }
21 }
1 package com.liuhuan;
2
3 import org.apache.hadoop.io.LongWritable;
4 import org.apache.hadoop.io.NullWritable;
5 import org.apache.hadoop.io.Text;
6 import org.apache.hadoop.mapreduce.Mapper;
7
8 import java.io.IOException;
9
10 public class RainTwoMapper extends Mapper<LongWritable, Text, RainBean, NullWritable> {
11
12 RainBean rainBean = new RainBean();
13
14 @Override
15 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
16 String[] split = value.toString().split("\t");
17 rainBean.setCity(split[0]);
18 rainBean.setNum(Integer.parseInt(split[1]));
19 context.write(rainBean,NullWritable.get());
20 }
21 }
1 package com.liuhuan;
2
3 import org.apache.hadoop.io.NullWritable;
4 import org.apache.hadoop.mapreduce.Reducer;
5
6 import java.io.IOException;
7
8 public class RainTwoReduce extends Reducer<RainBean, NullWritable, RainBean, NullWritable> {
9
10 @Override
11 protected void reduce(RainBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
12 context.write(key,NullWritable.get());
13 }
14 }
1 package com.liuhuan;
2
3 import org.apache.hadoop.conf.Configuration;
4 import org.apache.hadoop.fs.Path;
5 import org.apache.hadoop.io.IntWritable;
6 import org.apache.hadoop.io.NullWritable;
7 import org.apache.hadoop.io.Text;
8 import org.apache.hadoop.mapreduce.Job;
9 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
11
12 import java.io.IOException;
13
14 public class RainDriver {
15
16 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
17 Configuration cfg = new Configuration();
18 Job job = Job.getInstance(cfg);
19 job.setJobName("job1");
20
21 job.setJarByClass(RainDriver.class);
22 job.setMapperClass(RainMapper.class);
23 job.setReducerClass(RainReduce.class);
24
25 job.setMapOutputKeyClass(Text.class);
26 job.setMapOutputValueClass(IntWritable.class);
27 job.setOutputKeyClass(Text.class);
28 job.setOutputValueClass(IntWritable.class);
29
30 FileInputFormat.setInputPaths(job,new Path(args[0]));
31 FileOutputFormat.setOutputPath(job,new Path(args[1]));
32
33 boolean b = job.waitForCompletion(true);
34
35 if (b){
36 Job job2 = Job.getInstance(cfg);
37 job2.setJobName("job2");
38
39 job2.setJarByClass(RainDriver.class);
40 job2.setMapperClass(RainTwoMapper.class);
41 job2.setReducerClass(RainTwoReduce.class);
42
43 job2.setMapOutputKeyClass(RainBean.class);
44 job2.setMapOutputValueClass(NullWritable.class);
45 job2.setOutputKeyClass(RainBean.class);
46 job2.setOutputValueClass(NullWritable.class);
47
48 FileInputFormat.setInputPaths(job2,new Path(args[1]));
49 FileOutputFormat.setOutputPath(job2,new Path(args[2]));
50
51 boolean b1 = job2.waitForCompletion(true);
52 System.out.println(b1);
53 }
54
55 }
56 }