将手机用户使用流量的数据进行分组,排序;
测试准备:
首先同步时间,然后master先开启hdfs集群,再开启yarn集群;用jps查看:
master上: 先有NameNode、SecondaryNameNode;再有ResourceManager;
slave上: 先有DataNode;再有NodeManager;
如果master启动hdfs和yarn成功,但是slave节点有的不成功,则可以使用如下命令手动启动:
| hadoop-daemon.sh start datanode |
| yarn-daemon.sh start nodemanager |
然后在本地"/home/hadoop/test/"目录创建phoneflow文件夹,将所有需要统计的数据放到该文件夹下;
测试目标:
目标一:输出结果是:按手机号分组后,按照上传流量和下载流量的总和排序的结果;
目标二:输出结果是:按手机号分组后,先按照上传流量排序,遇到相同时再按照上传流量和下载流量的总和排序;
测试代码:
目标一:
因为涉及到了排序,我们输出的结果是一个包装好的flow对象(它自身就包含了很多信息);
分组必须必须要让flow类实现Serializable接口;
排序就必须要让flow类在分组的基础上再实现WritableComparable接口,并且重写write、readFields方法和重写compareTo方法;
1 package com.mmzs.bigdata.yarn.mapreduce; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 import java.io.Serializable; 7 8 import org.apache.hadoop.io.WritableComparable; 9 10 public class Flow implements WritableComparable<Flow>,Serializable{ 11 12 private String phoneNum;//手机号 13 private Long upFlow; //上传流量 14 private Long downFlow; //下载流量 15 public Flow() {} 16 public Flow(String phoneNum, Long upFlow, Long downFlow) { 17 super(); 18 this.phoneNum = phoneNum; 19 this.upFlow = upFlow; 20 this.downFlow = downFlow; 21 } 22 public Long getTotalFlow() { 23 return upFlow+downFlow; 24 } 25 26 27 //按照怎样的顺序写入到reduce中,在reduce中就按照怎样的顺序读 28 //write是一个序列化的过程 29 @Override 30 public void write(DataOutput out) throws IOException { 31 out.writeUTF(phoneNum); 32 out.writeLong(upFlow); 33 out.writeLong(downFlow); 34 } 35 //read是一个反序列化的过程 36 @Override 37 public void readFields(DataInput in) throws IOException { 38 this.phoneNum = in.readUTF(); 39 this.upFlow = in.readLong(); 40 this.downFlow = in.readLong(); 41 } 42 //reduce任务排序的依据 43 @Override 44 public int compareTo(Flow flow) { 45 Long curTotalFlow = this.getTotalFlow(); 46 Long paramTotalFlow = flow.getTotalFlow(); 47 Long resFlow = curTotalFlow-paramTotalFlow; 48 return resFlow>0?-1:1; 49 } 50 51 52 public String getPhoneNum() { 53 return phoneNum; 54 } 55 public void setPhoneNum(String phoneNum) { 56 this.phoneNum = phoneNum; 57 } 58 public Long getUpFlow() { 59 return upFlow; 60 } 61 public void setUpFlow(Long upFlow) { 62 this.upFlow = upFlow; 63 } 64 public Long getDownFlow() { 65 return downFlow; 66 } 67 public void setDownFlow(Long downFlow) { 68 this.downFlow = downFlow; 69 } 70 //此方法只是单纯的为了方便一次性设置值,只set一次 71 public void setFlow(String phoneNum, Long upFlow, Long downFlow) { 72 this.phoneNum = phoneNum; 73 this.upFlow = upFlow; 74 this.downFlow = downFlow; 75 } 76 @Override 77 public String toString() { 78 return new StringBuilder(phoneNum).append("\t") 79 .append(upFlow).append("\t") 80 .append(downFlow).append("\t") 81 .append(getTotalFlow()) 82 .toString(); 83 } 84 85 }