前言
前面的一篇给大家写了一些MapReduce的一些程序,像去重、词频统计、统计分数、共现次数等。这一篇给大家介绍的是关于Combiner优化操作。
1.1、为什么需要Combiner
我们map任务处理的结果是存放在运行map任务的节点上。
map处理的数据的结果在进入reduce的时候,reduce会通过远程的方式去获取数据。
在map处理完数据之后,数据量特别大的话。reduce再去处理数据它就要通过网络去获取很多的数据。
这样会导致一个问题是:大量的数据会对网络带宽造成一定的影响。
有没有一种方式能够类似reduce一样,在map端处理完数据之后,然后在reduce端进行一次简单的数据处理?
MapReudce正常处理是:
map处理完,中间结果存放在map节点上。reduce处理的数据通过网络形式拿到reduce所在的节点上。
如果我们能够在map端进行一次类似于reduce的操作,这样会使进入reduce的数据就会少很多。
我们把在map端所执行的类似于reduce的操作成为Combiner。
1.2、Combiner介绍
1) 前提
每一个map都可能会产生大量的本地输出
2)Combiner功能
对map端的输出先做一次合并
3)目的
减少在map和reduce节点之间的数据传输量, 以提高网络IO性能。
二、使用Combiner优化Mapduce执行
2.1、使用前提
不能对最原始的map的数据流向reduce造成影响。也就是说map端进入reduce的数据不收Combiner的影响。
数据输入的键值类型和数据输出的键值类型一样的reduce我们可以把它当做Combiner来使用
举例:
我们前面一篇博客中有一个处理的是求用户的好友列表的数据。
我们之后进入map端的数据类型为LongWritable,Text,而map端输出的数据类型为Text,Text(用户,好友),进入reduce之后reduce的输入类型为Text,Text,
最后reduce的输出也是Text,Text(用户,好友列表)。
这样总结:
reduce的输入类型等于reduce输出的数据类型,这样符合Combiner的情况。(这样我们就不需要去自定义数据类型了)
2.2、怎么使用
其实Combiner的本质就是一个reducer,那我们要写Combiner我们就要继承reducer。
下面写一个例子,首先你需要了解我前面写的一个专利引用的例子,才能了解专利文件数据格式。
需求:求这个专利以及这个专利它引用了哪些专利。
import com.briup.bd1702.hadoop.mapred.utils.PatentRecordParser;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class PatentReferenceWithCombiner_0010 extends Configured implements Tool{
public static class PatentReferenceMapper extends Mapper<LongWritable,Text,Text,Text>{
private PatentRecordParser parser=new PatentRecordParser();
private Text key=new Text();
private Text value=new Text();
@Override
protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
parser.parse(value);
if(parser.isValid()){
this.key.set(parser.getPatentId());
this.value.set(parser.getRefPatentId());
context.write(this.key,this.value);
}
}
}
public static class PatentReferenceReducer extends Reducer<Text,Text,Text,Text>{
private Text value=new Text();
@Override
protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
StringBuffer refPatentIds=null;
for(Text value:values){
refPatentIds.append(value.toString()+",");
}
this.value.set(refPatentIds.toString());
context.write(key,value);
}
}
@Override
public int run(String[] args) throws Exception{
Configuration conf=getConf();
Path input=new Path(conf.get("input"));
Path output=new Path(conf.get("output"));
// 构建Job对象,并设置驱动类名和Job名,用于提交作业
Job job=Job.getInstance(conf,this.getClass().getSimpleName());
job.setJarByClass(this.getClass());
// 给Job设置Mapper类以及map方法输出的键值类型
job.setMapperClass(PatentReferenceMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 给Job设置Reducer类及reduce方法输出的键值类型
job.setReducerClass(PatentReferenceReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置文件的读取方式,文本文件;输出方式,文本文件
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// 给Job是定输入文件的路径和输出结果的路径
TextInputFormat.addInputPath(job,input);
TextOutputFormat.setOutputPath(job,output);
// 设置Combiner
job.setCombinerClass(PatentReferenceReducer.class);
return job.waitForCompletion(true)?0:1;
}
public static void main(String[] args) throws Exception{
System.exit(ToolRunner.run(new P00010_PatentReferenceWithCombiner_0010(),args));
}
}