分区、分组
分区:在Mapper的输出时进行,默认会采用HashPartitioner,会根据key值和reduce数进行分组;在写入MapOutputBuffer的缓冲区之前每个kv对就已经获取了对应的分区索引,在溢写时默认会根据分区索引从小到大,key值从小到大进行排序;并且rudecer数决定了分区数量,因为一个reducer只能处理一个分区。自定义分组器通过Job.setPartitionerClass()指定。
分组:在Reducer端进行,默认使用的是WritableComparator比较器进行对key值的比较,key值相同的会被分在一组。而reduce()函数是按照组为操作对象进行统计的,也就是有多少个组,则调用几次reduce()函数。这就与Mapper.run()不同,Mapper的操作对象是kv对,有多少个kv对则调用几次map()方法。自定义分组器通过Job.setGroupingComparatorClass()指定。
不妨对比一下map()和reduce()方法
//操作对象是kv对
protected void map(KEYIN key, VALUEIN value,
Context context) throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}
//操作对象是组,Iterable<VALUEIN> values可以看成是该组的迭代器
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
) throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}
也可以这么理解,reducer的输入为<k2,list(v2)>,当然v2并不是以列表的形式存放,而是相同的key值,对应着一组的value,reduce()方法是对一个key和它的一组value进行操作的
下面可以测试一下是否分组数决定了reduce()被调用的次数
源文件如下
a 1
b 1
b 2
c 1
c 2
d 1
根据key值所以默认应该是由4组,接下来采用KeyValueInputFormat,所以mapper为默认的,reducer中有一个次数器
public class MyReducer extends Reducer<Text, Text, Text, Text>{
int reduceNum = 0;
protected void reduce(Text arg0, Iterable<Text> arg1, Reducer<Text, Text, Text, Text>.Context arg2)
throws IOException, InterruptedException {
reduceNum++;
System.out.println("这是第"+reduceNum+"次调用reduce()");
}
}
运行一下,日志信息
map输出6条记录,key值相同的经过分组后为4组,打印信息:
果然,只调用了4次reduce()
分组的有点像是在同一个Reducer内进行"分区",这在自定义分组器对组合键进行分组很有优势。
可参考《Hadoop权威指南 第4版》P259的辅助排序,案例中组合键为<年份-温度>,并且一同年份的温度为降序排序。为了求出每年的最高温,当然可以自定义分区器,将同一年份的分为一个区,但是这样会导致一个reducer内会进行分组,由于组合键中只有第一字段相同,分组器为视为key值不同,那么此时分区也没什么意义了;解决方案是进行自定义分组器,将同一年份的分在一组,那么一共有多少个年份就会调用几个reduce(),在对每个分组输出首行记录即为该年的最高温度。
案例
当前有3个文件,内容如下
w1.txt:
MapReduce is simple
w2.txt:
MapReduce is powerful is simple
w3.txt
Hello MapReduce bye MapReduce
要求统计每个单词在不同文件中的出现频率,输出格式为: 单词 文件1 次数 文件2 次数...
可以自定义组合键<单词,文件名>,Mapper输出<[单词,文件名],1>,在combine环节输出<[单词,文件名],次数>,然后自定义分组器,将同一个单词的记录分在一组,在reduce()中仅对文件名和次数进行字符串拼接即可。
由于没有排序的要求,当然也可以通过自定义分区器实现,不过这样有多少个单词就会有少个分区,记录会分散在多个文件中。
代码如下
自定义Writable类 WordFile 略
Mapper:
public class WFMapper extends Mapper<LongWritable, Text, WordFile, IntWritable>{
IntWritable one = new IntWritable(1);
Text word = new Text();
Text fileName = new Text();
WordFile wf = new WordFile();
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, WordFile, IntWritable>.Context context)
throws IOException, InterruptedException {
InputSplit split = context.getInputSplit();
Path path = ((FileSplit)split).getPath();
fileName.set(path.getName());
StringTokenizer itr = new StringTokenizer(value.toString(), " ");
while(itr.hasMoreTokens()){
word.set(itr.nextToken());
wf.set(word, fileName);
context.write(wf, one);
}
}
}
Combiner:
public class WFCombiner extends Reducer<WordFile, IntWritable, WordFile, IntWritable>{
IntWritable total = new IntWritable();
protected void reduce(WordFile wf, Iterable<IntWritable> ones,
Reducer<WordFile, IntWritable, WordFile, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum = 0;
for(IntWritable one : ones){
sum +=1;
}
total.set(sum);
context.write(wf, total);
}
}
GroupComparator:
public class WFGroupComparator extends WritableComparator {
public WFGroupComparator(){
super(WordFile.class,true);
}
public int compare(WritableComparable a, WritableComparable b) {
WordFile wf1 = (WordFile) a;
WordFile wf2 = (WordFile) b;
return wf1.getWord().compareTo(wf2.getWord());
}
}
Reducer:
public class WFReducer extends Reducer<WordFile, IntWritable, Text, Text> {
Text text = new Text();
protected void reduce(WordFile wf, Iterable<IntWritable> sum,
Reducer<WordFile, IntWritable, Text, Text>.Context context) throws IOException, InterruptedException {
String str = "";
for (IntWritable s : sum) {
str += wf.getFileName().toString() + " " + s + "\t";
}
text.set(str);
context.write(wf.getWord(), text);
}
}