/** Partition keys by their {@link Object#hashCode()}. */ @InterfaceAudience.Public @InterfaceStability.Stable public class HashPartitioner<K, V> extends Partitioner<K, V> { /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
传入key和value,获取一个组号:
(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
默认numReduceTasks为1,求余数之后为0,所以对于所有的key都只会有一个组号。不会对数据进行分组操作。比如上篇文章中的数据,可以全部由一个reduce处理。
二:对流量实现自定义分组(按照省份)
(一)实验目标
对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件中
(二)实现思路---自定义改造两个机制
1.自定义一个partitioner,改造分区的逻辑。
2.自定义reducer task的并发任务数
(三)对于我们设置的分组,我们应该设置大于等于分组数的Reduce数量才对
三:代码实现
(一)自定义数据结构
package cn.hadoop.dg; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class FlowBean implements WritableComparable<FlowBean> { private String phoneNB; private long up_flow; private long down_flow; private long sum_flow; public FlowBean() {} //无参构造函数,用于反序列化时使用 public FlowBean(String phoneNB, long up_flow, long down_flow) { this.phoneNB = phoneNB; this.up_flow = up_flow; this.down_flow = down_flow; this.sum_flow = up_flow + down_flow; } public String getPhoneNB() { return phoneNB; } public void setPhoneNB(String phoneNB) { this.phoneNB = phoneNB; } public long getUp_flow() { return up_flow; } public void setUp_flow(long up_flow) { this.up_flow = up_flow; } public long getDown_flow() { return down_flow; } public void setDown_flow(long down_flow) { this.down_flow = down_flow; } public long getSum_flow() { return up_flow + down_flow; } //用于序列化 @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeUTF(phoneNB); out.writeLong(up_flow); out.writeLong(down_flow); out.writeLong(up_flow+down_flow); } //用于反序列化 @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub phoneNB = in.readUTF(); up_flow = in.readLong(); down_flow = in.readLong(); sum_flow = in.readLong(); } @Override public int compareTo(FlowBean o) { return sum_flow > o.sum_flow ? -1 : 1; //返回值为-1,则排在前面 } @Override public String toString() { return "" + up_flow + "\t" + down_flow + "\t"+ sum_flow; } }