- 需求描述:
要求从给出的数据中寻找所关心的数据,它是对原始数据所包含信息的挖掘。下面进入这个实例。
实例中给出child-parent(孩子——父母)表,要求输出grandchild-grandparent(孙子——爷奶)表。
=================样本输入:===================
file:
child parent
Tom Lucy
Tom Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Ben
Jack Alice
Jack Jesse
Terry Alice
Terry Jesse
Philip Terry
Philip Alma
Mark Terry
Mark Alma
家族树状关系谱:
样例输出如下所示。
file:
grandchild grandparent
Tom Alice
Tom Jesse
Jone Alice
Jone Jesse
Tom Mary
Tom Ben
Jone Mary
Jone Ben
Philip Alice
Philip Jesse
直接看代码(打成jar包 放在Linux上运行):
package com.alibaba.hdfs;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Parents {
public static class MRMapper extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
String child=split[0];
String parent=split[1];
context.write(new Text(child), new Text(parent+">p"));
context.write(new Text(parent), new Text(child+">c"));
}
}
public static class MRReducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
ArrayList<String> plist = new ArrayList<>();
ArrayList<String> clist = new ArrayList<>();
for (Text t : values) {
String k=t.toString().split(">")[0];
String v=t.toString().split(">")[1];
if("p".equals(v)){
plist.add(k);
}else{
clist.add(k);
}
}
for (String c : clist) {
for (String p : plist) {
context.write(new Text(c), new Text(p));
}
}
}
}
public static void main(String[] args)throws Exception {
//创建任务配置对象
Configuration cfg = new Configuration();
//个性化任务
Job job = Job.getInstance(cfg);
//指定jar包要执行的main方法所在的类
job.setJarByClass(Parents.class);
//指定要计算的文件输入路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
//设置mapper类
job.setMapperClass(MRMapper.class);
//设置mapper输出的key跟value的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//设置reducer所在的类
job.setReducerClass(MRReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.waitForCompletion(true);
}
}