• MR的本地运行
    1. 将本地hadoop环境安装,将hadoop.dll文件及winutils.exe放入hadoop的bin目录下,并配置环境变量; ---- (本地运行MR程序必须安装hadoop)

    System.setProperty("HADOOP_HOME_USER","root");
System.setProperty("hadoop.home.dir","D:\\hadoop-2.7.1"); //找到我的hadoop安装目录
Configuration conf = new Configuration();
conf.set("mapreduce.framework.name", "local");

 

    1. 如果出现这错误,将org包整个复制到Java目录下。Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
    2. 设置参数(正反斜杠都可以)

 

mapreduce操作经验

 

4.加入log4j文件,方便调试

 

出现的问题: 包出问题

      <dependency>

            <groupId>jdk.tools</groupId>

            <artifactId>jdk.tools</artifactId>

            <version>1.8</version>

            <scope>system</scope>

            <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>

          </dependency>

 

  • Maptask的执行流程(详细)

MapReduce核心掌握:

1)job提交流程

2)MapReduce执行过程

3)shuffle过程(从map端的环形缓冲区到reduce端把数据本地聚合)

     (Map端的shuffle,写入环形缓冲区、分区排序(内存中进行)、溢写磁盘、combiner合并排序(归并排序外部处理)、combiner)

     (Reduce端的shuffle主要包括,copy,sort(merge),不包含reduce聚合处理)

 

4)可干预的六处:输入输出(inputFormat/outputFormat)、分区(对应reduce数)、排序(默认是字典顺序)、分组、combine

 

  • 自定义combine

combiner:不能影响原来的结果数据,如计算平均值就会出错使用combiner(map端的reduce程序)后,确保移除combiner,原理的map的输出类型和reduce端的输入类型保持不变!

四、自定义分区: 

分区是发生在map缓冲区写入磁盘时进行的,所以输入类型就写map的输出类型!

<Text, FlowBean> 数据分发是按照map输出的数据进行一定规则处理去分发的

 */

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {

    //模拟数据字典

    private static HashMap<String,Integer> pmap= new HashMap<>();

    static {

        //假设136是0号省份,被分发到0号reduce去处理

        pmap.put("136",0);

        pmap.put("137",1);

        pmap.put("138",2);

        pmap.put("139",3);

    }

    @Override

    public int getPartition(Text key, FlowBean flowBean, int numPartitions) {

        String prefix = key.toString().substring(0,3);

        Integer partNum = pmap.get(prefix);

        return (partNum==null)?4:partNum;

    }

}

 

job中使用自定义的分区:

//将写好的自定义数据分发规则添加进设置

     job.setPartitionerClass(ProvincePartitioner.class);

 //设置reduce数量

     job.setNumReduceTasks(5);

 

---------------------------------

五、自定义javabean,实现序列化比较接口:

public class RateBean implements WritableComparable<RateBean>

 

反序列化用到反射所以需要写个无参构造函数

@Override

    public void write(DataOutput out) throws IOException {

        out.writeUTF(this.movie);        //writeUTF读写字符串类型

        out.writeUTF(this.rate);

        out.writeUTF(this.timeStamp);

        out.writeUTF(this.uid);

    }

 

    @Override

    public void readFields(DataInput in) throws IOException {

        movie=in.readUTF();

        rate=in.readUTF();

        timeStamp=in.readUTF();

        uid=in.readUTF();

}

 

 

@Override

    public int compareTo(RateBean o) {

        return -this.rate.compareTo(o.rate);   //”-”表示倒序,属性通过对象.调用

    }

 

 

调用排序:

List<RateBean> list = new ArrayList<>();

Collections.sort(list);

---------

 

Configuration conf = new Configuration();

        //自定义行分隔符

        //conf.set("textinputformat.record.delimiter",";");

 

设置reduce数量

            //job.setNumReduceTasks(10);

 

 

 //将读入的一个json字符串转换成一个Java bean

        ObjectMapper objectMapper = new ObjectMapper();

        RateBean rateBean = objectMapper.readValue(value.toString(),RateBean.class);

 

 

Job代码中设置conf参数:

Configuration conf = new Configuration();

        //设置本地

        //conf.set("topN","10");

 

Map、reduce中获取:

Configuration conf = context.getConfiguration();

    int topN = conf.getInt("topN",5);    

 

 

//设置分组

     job.setGroupingComparatorClass(RateGroupingComparetor.class);

 

 

map端join:

//加入分布式缓存

   job.setCacheFiles( new URI[]{new Path("/data/user").toUri(),

                           new Path("/data/sex").toUri()});

//map端读取缓存

protected void setup(Context context) throws IOException, InterruptedException {

        //获取分布式缓存文件

        URI[]cacheFiles = context.getCacheFiles();

        BufferedReader br = null;

        for (URI p: cacheFiles){

            //获取路径名字

            String pathName =p.getPath();

            if (pathName.equals("/data/sex")){

                br = new BufferedReader(new FileReader(new File("sex")));

                String str = null;

                while ((str=br.readLine())!=null){

                    String []strs = str.split("\t");

                    sexMap.put(strs[0],strs[1]);     //放进map集合

                }

            }

    }

 

//map方法中使用:

        String uname = userMap.get(uid);

        String sexName = sexMap.get(sexId);

相关文章:

  • 2022-01-03
  • 2022-01-27
  • 2022-01-21
  • 2022-12-23
  • 2021-09-22
  • 2021-11-20
  • 2022-12-23
猜你喜欢
  • 2022-12-23
  • 2021-12-19
  • 2021-10-14
  • 2022-02-15
  • 2021-06-19
  • 2022-12-23
相关资源
相似解决方案