/** Partition keys by their {@link Object#hashCode()}. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class HashPartitioner<K, V> extends Partitioner<K, V> {

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}

传入key和value,获取一个组号:

(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;

默认numReduceTasks为1,求余数之后为0,所以对于所有的key都只会有一个组号。不会对数据进行分组操作。比如上篇文章中的数据,可以全部由一个reduce处理。

二:对流量实现自定义分组(按照省份)

(一)实验目标

对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件中

(二)实现思路---自定义改造两个机制

1.自定义一个partitioner,改造分区的逻辑。

2.自定义reducer task的并发任务数

(三)对于我们设置的分组,我们应该设置大于等于分组数的Reduce数量才对

三:代码实现

Hadoop基础---MapReduce程序实现自定义分组(进一步理解Hadoop机制---实现reduce task并发)

(一)自定义数据结构

package cn.hadoop.dg;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class FlowBean implements WritableComparable<FlowBean> {
    private String phoneNB;
    private long up_flow;
    private long down_flow;
    private long sum_flow;
    
    public FlowBean() {}    //无参构造函数,用于反序列化时使用

    public FlowBean(String phoneNB, long up_flow, long down_flow) {
        this.phoneNB = phoneNB;
        this.up_flow = up_flow;
        this.down_flow = down_flow;
        this.sum_flow = up_flow + down_flow;
    }
    

    public String getPhoneNB() {
        return phoneNB;
    }

    public void setPhoneNB(String phoneNB) {
        this.phoneNB = phoneNB;
    }

    public long getUp_flow() {
        return up_flow;
    }

    public void setUp_flow(long up_flow) {
        this.up_flow = up_flow;
    }

    public long getDown_flow() {
        return down_flow;
    }

    public void setDown_flow(long down_flow) {
        this.down_flow = down_flow;
    }

    public long getSum_flow() {
        return up_flow + down_flow;
    }


    //用于序列化
    @Override
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        out.writeUTF(phoneNB);
        out.writeLong(up_flow);
        out.writeLong(down_flow);
        out.writeLong(up_flow+down_flow);
    }
    
    //用于反序列化
    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        phoneNB = in.readUTF();
        up_flow = in.readLong();
        down_flow = in.readLong();
        sum_flow = in.readLong();
    }

    @Override
    public int compareTo(FlowBean o) {
        return sum_flow > o.sum_flow ? -1 : 1;    //返回值为-1,则排在前面
    }

    @Override
    public String toString() {
        return "" + up_flow + "\t" + down_flow + "\t"+ sum_flow;
    }

    
}
FlowBean

相关文章:

  • 2022-01-05
  • 2022-12-23
  • 2021-07-21
  • 2022-03-01
  • 2021-07-27
  • 2022-12-23
  • 2021-06-21
猜你喜欢
  • 2022-12-23
  • 2021-09-11
  • 2021-08-05
  • 2021-05-19
  • 2022-12-23
  • 2022-12-23
  • 2021-11-22
相关资源
相似解决方案