将手机用户使用流量的数据进行分组,排序;

 

测试准备:

首先同步时间,然后master先开启hdfs集群,再开启yarn集群;用jps查看:

master上: 先有NameNode、SecondaryNameNode;再有ResourceManager;

slave上:   先有DataNode;再有NodeManager;

如果master启动hdfs和yarn成功,但是slave节点有的不成功,则可以使用如下命令手动启动: 

hadoop-daemon.sh start datanode
yarn-daemon.sh start nodemanager

然后在本地"/home/hadoop/test/"目录创建phoneflow文件夹,将所有需要统计的数据放到该文件夹下;

YARN集群的mapreduce测试(四)

 

测试目标:

 目标一:输出结果是:按手机号分组后,按照上传流量和下载流量的总和排序的结果;

 目标二:输出结果是:按手机号分组后,先按照上传流量排序,遇到相同时再按照上传流量和下载流量的总和排序;

 

测试代码

目标一:

因为涉及到了排序,我们输出的结果是一个包装好的flow对象(它自身就包含了很多信息);

分组必须必须要让flow类实现Serializable接口;

排序就必须要让flow类在分组的基础上再实现WritableComparable接口,并且重写write、readFields方法和重写compareTo方法; 

 1 package com.mmzs.bigdata.yarn.mapreduce;
 2 
 3 import java.io.DataInput;
 4 import java.io.DataOutput;
 5 import java.io.IOException;
 6 import java.io.Serializable;
 7 
 8 import org.apache.hadoop.io.WritableComparable;
 9 
10 public class Flow implements WritableComparable<Flow>,Serializable{
11 
12     private String phoneNum;//手机号
13     private Long upFlow;    //上传流量
14     private Long downFlow;  //下载流量
15     public Flow() {}
16     public Flow(String phoneNum, Long upFlow, Long downFlow) {
17         super();
18         this.phoneNum = phoneNum;
19         this.upFlow = upFlow;
20         this.downFlow = downFlow;
21     }
22     public Long getTotalFlow() {
23         return upFlow+downFlow;
24     }
25 
26     
27     //按照怎样的顺序写入到reduce中,在reduce中就按照怎样的顺序读
28     //write是一个序列化的过程
29     @Override
30     public void write(DataOutput out) throws IOException {
31         out.writeUTF(phoneNum);
32         out.writeLong(upFlow);
33         out.writeLong(downFlow);
34     }
35     //read是一个反序列化的过程
36     @Override
37     public void readFields(DataInput in) throws IOException {
38         this.phoneNum = in.readUTF();
39         this.upFlow = in.readLong();
40         this.downFlow = in.readLong();
41     }
42     //reduce任务排序的依据
43     @Override
44     public int compareTo(Flow flow) {
45         Long curTotalFlow = this.getTotalFlow();
46         Long paramTotalFlow = flow.getTotalFlow();
47         Long resFlow = curTotalFlow-paramTotalFlow;
48         return resFlow>0?-1:1;
49     }
50     
51     
52     public String getPhoneNum() {
53         return phoneNum;
54     }
55     public void setPhoneNum(String phoneNum) {
56         this.phoneNum = phoneNum;
57     }
58     public Long getUpFlow() {
59         return upFlow;
60     }
61     public void setUpFlow(Long upFlow) {
62         this.upFlow = upFlow;
63     }
64     public Long getDownFlow() {
65         return downFlow;
66     }
67     public void setDownFlow(Long downFlow) {
68         this.downFlow = downFlow;
69     }
70     //此方法只是单纯的为了方便一次性设置值,只set一次
71     public void setFlow(String phoneNum, Long upFlow, Long downFlow) {
72         this.phoneNum = phoneNum;
73         this.upFlow = upFlow;
74         this.downFlow = downFlow;
75     }
76     @Override
77     public String toString() {
78         return new StringBuilder(phoneNum).append("\t")
79                 .append(upFlow).append("\t")
80                 .append(downFlow).append("\t")
81                 .append(getTotalFlow())
82                 .toString();
83     }
84 
85 }
Flow

相关文章: