代码存于github:https://github.com/zuodaoyong/Hadoop
1、Reduce Join(会出现数据倾斜)
通过将关联条件作为Map输出的key,将两表满足Join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联
需求:将商品信息表中数据根据商品pid合并到订单数据表中
订单数据:
|
Id |
pid |
amount |
|
1001 |
01 |
1 |
|
1002 |
02 |
2 |
|
1003 |
03 |
3 |
|
1004 |
01 |
4 |
|
1005 |
02 |
5 |
|
1006 |
03 |
6 |
商品信息:
|
pid |
Pname |
|
01 |
小米 |
|
02 |
华为 |
|
03 |
格力 |
合并后的结果:
|
Id |
pname |
Amount |
|
1001 |
小米 |
1 |
|
1004 |
小米 |
4 |
|
1002 |
华为 |
2 |
|
1005 |
华为 |
5 |
|
1003 |
格力 |
3 |
|
1006 |
格力 |
6 |
2、Map Join
Map Join适用于一张表十分小、一张表很大的场景。
在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。
采用DistributedCache
(1)在Mapper的setup阶段,将文件读取到缓存集合中。
(2)在驱动函数中加载缓存。
//添加缓存数据
job.addCacheFile(new URI("/mapreduce/join/pd"));
job.setNumReduceTasks(0);
public class DistributedCacheMapper extends Mapper<LongWritable, Text,OrderWrapper, NullWritable>{
Map<String,String> map=new HashedMap<>();
private BufferedReader bufferedReader;
private String[] splits;
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
//获取缓存的文件
URI[] cacheFiles = context.getCacheFiles();
String path = cacheFiles[0].getPath().toString();
FileSystem fileSystem = FileSystem.get(context.getConfiguration());
FSDataInputStream hdfsInStream = fileSystem.open(new Path(path));
bufferedReader = new BufferedReader(new InputStreamReader(hdfsInStream, "UTF-8"));
String line;
while(StringUtils.isNotEmpty(line = bufferedReader.readLine())) {
// 2 切割
String[] fields = line.split("\t");
// 3 缓存数据到集合
map.put(fields[0], fields[1]);
}
//关闭流
bufferedReader.close();
}
@Override
protected void map(LongWritable key, Text value,
Context context)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String fileName = fileSplit.getPath().getName();
if(!"pd".equals(fileName)){
String line = value.toString();
if(StringUtils.isNotEmpty(line)){
String[] splits = line.split("\t");
OrderWrapper wrapper=new OrderWrapper();
wrapper.setId(splits[0]);
wrapper.setPid(splits[1]);
wrapper.setAmount(Integer.valueOf(splits[2]));
wrapper.setPname(map.get(splits[1]));
wrapper.setFlag("");
context.write(wrapper, NullWritable.get());
}
}
}
}
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "root");
Configuration configuration=new Configuration();
Job job = Job.getInstance(configuration);
job.setMapperClass(DistributedCacheMapper.class);
job.setMapOutputKeyClass(OrderWrapper.class);
job.setMapOutputValueClass(NullWritable.class);
//添加缓存数据
job.addCacheFile(new URI("/mapreduce/join/pd"));
//Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job, new Path("/mapreduce/join/order"));
FileOutputFormat.setOutputPath(job, new Path("/mapreduce/join/output"));
boolean waitForCompletion = job.waitForCompletion(true);
System.exit(waitForCompletion==true?0:1);
}