分区、分组

分区:在Mapper的输出时进行,默认会采用HashPartitioner,会根据key值和reduce数进行分组;在写入MapOutputBuffer的缓冲区之前每个kv对就已经获取了对应的分区索引,在溢写时默认会根据分区索引从小到大,key值从小到大进行排序;并且rudecer数决定了分区数量,因为一个reducer只能处理一个分区。自定义分组器通过Job.setPartitionerClass()指定。

分组:在Reducer端进行,默认使用的是WritableComparator比较器进行对key值的比较,key值相同的会被分在一组。而reduce()函数是按照组为操作对象进行统计的,也就是有多少个组,则调用几次reduce()函数。这就与Mapper.run()不同,Mapper的操作对象是kv对,有多少个kv对则调用几次map()方法。自定义分组器通过Job.setGroupingComparatorClass()指定。

不妨对比一下map()和reduce()方法

  //操作对象是kv对
  protected void map(KEYIN key, VALUEIN value, 
                     Context context) throws IOException, InterruptedException {
    context.write((KEYOUT) key, (VALUEOUT) value);
  }

  //操作对象是组,Iterable<VALUEIN> values可以看成是该组的迭代器
  protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
                        ) throws IOException, InterruptedException {
    for(VALUEIN value: values) {
      context.write((KEYOUT) key, (VALUEOUT) value);
    }
  }

也可以这么理解,reducer的输入为<k2,list(v2)>,当然v2并不是以列表的形式存放,而是相同的key值,对应着一组的value,reduce()方法是对一个key和它的一组value进行操作的

 

下面可以测试一下是否分组数决定了reduce()被调用的次数

源文件如下

a	1
b	1
b	2
c	1
c	2
d	1

根据key值所以默认应该是由4组,接下来采用KeyValueInputFormat,所以mapper为默认的,reducer中有一个次数器

public class MyReducer extends Reducer<Text, Text, Text, Text>{

	int reduceNum = 0;
	
	protected void reduce(Text arg0, Iterable<Text> arg1, Reducer<Text, Text, Text, Text>.Context arg2)
			throws IOException, InterruptedException {
		reduceNum++;
		System.out.println("这是第"+reduceNum+"次调用reduce()");
	}
	
}

运行一下,日志信息

MapReduce:分区与分组

map输出6条记录,key值相同的经过分组后为4组,打印信息:

MapReduce:分区与分组

果然,只调用了4次reduce()

 

分组的有点像是在同一个Reducer内进行"分区",这在自定义分组器对组合键进行分组很有优势。

可参考《Hadoop权威指南 第4版》P259的辅助排序,案例中组合键为<年份-温度>,并且一同年份的温度为降序排序。为了求出每年的最高温,当然可以自定义分区器,将同一年份的分为一个区,但是这样会导致一个reducer内会进行分组,由于组合键中只有第一字段相同,分组器为视为key值不同,那么此时分区也没什么意义了;解决方案是进行自定义分组器,将同一年份的分在一组,那么一共有多少个年份就会调用几个reduce(),在对每个分组输出首行记录即为该年的最高温度。

 

案例

当前有3个文件,内容如下

w1.txt:
MapReduce is simple   

w2.txt:
MapReduce is powerful is simple

w3.txt
Hello MapReduce bye MapReduce

要求统计每个单词在不同文件中的出现频率,输出格式为: 单词  文件1 次数 文件2 次数...

可以自定义组合键<单词,文件名>,Mapper输出<[单词,文件名],1>,在combine环节输出<[单词,文件名],次数>,然后自定义分组器,将同一个单词的记录分在一组,在reduce()中仅对文件名和次数进行字符串拼接即可。

由于没有排序的要求,当然也可以通过自定义分区器实现,不过这样有多少个单词就会有少个分区,记录会分散在多个文件中。

 

代码如下

自定义Writable类 WordFile 略

Mapper:

public class WFMapper extends Mapper<LongWritable, Text, WordFile, IntWritable>{

	IntWritable one = new IntWritable(1);
	Text word = new Text();
	Text fileName = new Text();
	WordFile wf = new WordFile();
	
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, WordFile, IntWritable>.Context context)
			throws IOException, InterruptedException {
		
		InputSplit split = context.getInputSplit();
		Path path = ((FileSplit)split).getPath();
		fileName.set(path.getName());
		
		StringTokenizer itr = new StringTokenizer(value.toString(), " ");
		while(itr.hasMoreTokens()){
			word.set(itr.nextToken());
			wf.set(word, fileName);
			context.write(wf, one);
		}
	}
}

Combiner:

public class WFCombiner extends Reducer<WordFile, IntWritable, WordFile, IntWritable>{

	IntWritable total = new IntWritable();
	
	protected void reduce(WordFile wf, Iterable<IntWritable> ones,
			Reducer<WordFile, IntWritable, WordFile, IntWritable>.Context context)
					throws IOException, InterruptedException {
		int sum = 0;
		for(IntWritable one : ones){
			sum +=1;
		}
		total.set(sum);
		context.write(wf, total);
	}
	
}

GroupComparator:

public class WFGroupComparator extends WritableComparator {
	public  WFGroupComparator(){
		super(WordFile.class,true);
	}

	public int compare(WritableComparable a, WritableComparable b) {
		WordFile wf1 = (WordFile) a;
		WordFile wf2 = (WordFile) b;
		return wf1.getWord().compareTo(wf2.getWord());
	}	
}

Reducer:

public class WFReducer extends Reducer<WordFile, IntWritable, Text, Text> {

	Text text = new Text();

	protected void reduce(WordFile wf, Iterable<IntWritable> sum,
			Reducer<WordFile, IntWritable, Text, Text>.Context context) throws IOException, InterruptedException {
		String str = "";
		for (IntWritable s : sum) {
			str += wf.getFileName().toString() + "  " + s + "\t";
		}
		text.set(str);
		context.write(wf.getWord(), text);
	}
}

 

相关文章:

  • 2021-09-21
  • 2021-07-04
  • 2022-12-23
  • 2021-11-20
  • 2021-09-27
  • 2021-08-13
  • 2022-01-08
  • 2022-12-23
猜你喜欢
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2022-02-01
  • 2021-11-06
  • 2022-12-23
  • 2022-12-23
相关资源
相似解决方案