1. 自定义GroupingComparator
1.1.需求:有如下订单
现在需要求出每一个订单中成交金额最大的一笔交易
1.2.分析:
1、利用“订单id和成交金额”Bean作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,
发送到reduce
2、在reduce端利用GroupingComparator将订单id相同的kv聚合成组,然后取第一个即是最大值
定义订单信息bean,实现CompareTo()方法用于排序
package cn.bigdata.hdfs.secondarySort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; /** * 订单信息bean,实现hadoop的序列化机制 */ public class OrderBean implements WritableComparable<OrderBean>{ private Text itemid; private DoubleWritable amount; public OrderBean() { } public OrderBean(Text itemid, DoubleWritable amount) { set(itemid, amount); } public void set(Text itemid, DoubleWritable amount) { this.itemid = itemid; this.amount = amount; } public Text getItemid() { return itemid; } public DoubleWritable getAmount() { return amount; } //1.模型必须实现Comparable<T>接口 /*2.Collections.sort(list);会自动调用compareTo,如果没有这句,list是不会排序的,也不会调用compareTo方法 3.如果是数组则用的是Arrays.sort(a)方法*/ //implements WritableComparable必须要实现的方法,用于比较排序 @Override public int compareTo(OrderBean o) { //根據ID排序 int cmp = this.itemid.compareTo(o.getItemid()); //id相同根据金额排序 if (cmp == 0) { cmp = -this.amount.compareTo(o.getAmount()); } return cmp; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(itemid.toString()); out.writeDouble(amount.get()); } @Override public void readFields(DataInput in) throws IOException { String readUTF = in.readUTF(); double readDouble = in.readDouble(); this.itemid = new Text(readUTF); this.amount= new DoubleWritable(readDouble); } @Override public String toString() { return itemid.toString() + "\t" + amount.get(); } }