前言

  前面的一篇给大家写了一些MapReduce的一些程序,像去重、词频统计、统计分数、共现次数等。这一篇给大家介绍的是关于Combiner优化操作。

1.1、为什么需要Combiner

  我们map任务处理的结果是存放在运行map任务的节点上。
  map处理的数据的结果在进入reduce的时候,reduce会通过远程的方式去获取数据。
  在map处理完数据之后,数据量特别大的话。reduce再去处理数据它就要通过网络去获取很多的数据。
  这样会导致一个问题是:大量的数据会对网络带宽造成一定的影响。


  有没有一种方式能够类似reduce一样,在map端处理完数据之后,然后在reduce端进行一次简单的数据处理?
    MapReudce正常处理是:

      map处理完,中间结果存放在map节点上。reduce处理的数据通过网络形式拿到reduce所在的节点上。
      如果我们能够在map端进行一次类似于reduce的操作,这样会使进入reduce的数据就会少很多。

  我们把在map端所执行的类似于reduce的操作成为Combiner。

1.2、Combiner介绍

  1) 前提

    每一个map都可能会产生大量的本地输出
  2)Combiner功能

    对map端的输出先做一次合并
  3)目的

     减少在map和reduce节点之间的数据传输量, 以提高网络IO性能。

二、使用Combiner优化Mapduce执行

2.1、使用前提

  不能对最原始的map的数据流向reduce造成影响。也就是说map端进入reduce的数据不收Combiner的影响。

  数据输入的键值类型和数据输出的键值类型一样的reduce我们可以把它当做Combiner来使用

  Hadoop(十六)之使用Combiner优化MapReduce

  举例:

    我们前面一篇博客中有一个处理的是求用户的好友列表的数据。

    我们之后进入map端的数据类型为LongWritable,Text,而map端输出的数据类型为Text,Text(用户,好友),进入reduce之后reduce的输入类型为Text,Text, 

    最后reduce的输出也是Text,Text(用户,好友列表)。   

    这样总结:

      reduce的输入类型等于reduce输出的数据类型,这样符合Combiner的情况。(这样我们就不需要去自定义数据类型了)

2.2、怎么使用

  其实Combiner的本质就是一个reducer,那我们要写Combiner我们就要继承reducer。

  下面写一个例子,首先你需要了解我前面写的一个专利引用的例子,才能了解专利文件数据格式。

  需求:求这个专利以及这个专利它引用了哪些专利。

Hadoop(十六)之使用Combiner优化MapReduce
import com.briup.bd1702.hadoop.mapred.utils.PatentRecordParser;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class PatentReferenceWithCombiner_0010 extends Configured implements Tool{
    public static class PatentReferenceMapper extends Mapper<LongWritable,Text,Text,Text>{
        private PatentRecordParser parser=new PatentRecordParser();
        private Text key=new Text();
        private Text value=new Text();
        @Override
        protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
            parser.parse(value);
            if(parser.isValid()){
                this.key.set(parser.getPatentId());
                this.value.set(parser.getRefPatentId());
                context.write(this.key,this.value);
            }
        }
    }

    public static class PatentReferenceReducer extends Reducer<Text,Text,Text,Text>{
        private Text value=new Text();
        @Override
        protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
            StringBuffer refPatentIds=null;

            for(Text value:values){
                refPatentIds.append(value.toString()+",");
            }
            this.value.set(refPatentIds.toString());
            context.write(key,value);
        }
    }

    @Override
    public int run(String[] args) throws Exception{
        Configuration conf=getConf();
        Path input=new Path(conf.get("input"));
        Path output=new Path(conf.get("output"));
        // 构建Job对象,并设置驱动类名和Job名,用于提交作业
        Job job=Job.getInstance(conf,this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());

        // 给Job设置Mapper类以及map方法输出的键值类型
        job.setMapperClass(PatentReferenceMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 给Job设置Reducer类及reduce方法输出的键值类型
        job.setReducerClass(PatentReferenceReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置文件的读取方式,文本文件;输出方式,文本文件
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        // 给Job是定输入文件的路径和输出结果的路径
        TextInputFormat.addInputPath(job,input);
        TextOutputFormat.setOutputPath(job,output);

        // 设置Combiner
        job.setCombinerClass(PatentReferenceReducer.class);

        return job.waitForCompletion(true)?0:1;
    }

    public static void main(String[] args) throws Exception{
        System.exit(ToolRunner.run(new P00010_PatentReferenceWithCombiner_0010(),args));
    }
}
Hadoop(十六)之使用Combiner优化MapReduce

相关文章: