两张表链接操作(分布式缓存):

----------------------------------
假设:
其中一张A表,只有20条数据记录(比如group表)
另外一张非常大,上亿的记录数量(比如user表)
----------------------------------
策略:
将数据集小的文件直接装载到内存,然后迭代大文件记录;

 

分布式缓存的两种角度理解(针对较小数据集):
1、将文件不切块,直接存储到各个节点上的本地磁盘中,这种模式的缓存只是减少了网络IO,磁盘IO并没有减少;
2、将文件不切块,直接存储到各个节点上的任务进程内存中(MapTask进程,JVM会开辟一块任务内存),这种模式的缓存从根本上取消了该文件的读取IO操作;

 

测试准备

首先同步时间,然后master先开启hdfs集群,再开启yarn集群;用jps查看:

master上: 先有NameNode、SecondaryNameNode;再有ResourceManager;

slave上:   先有DataNode;再有NodeManager;

如果master启动hdfs和yarn成功,但是slave节点有的不成功,则可以使用如下命令手动启动: 

hadoop-daemon.sh start datanode
yarn-daemon.sh start nodemanager

然后在集群中的“/data/cacheusergroup/smalldata/”目录下上传本地文件group表,在“/data/cacheusergroup/src”目录下长传本地文件user表;内容如下:

YARN集群的mapreduce测试(六)

YARN集群的mapreduce测试(六)

 

测试目标:

1、将文件不切块,直接存储到各个节点上的本地磁盘中,这种模式的缓存只是减少了网络IO,磁盘IO并没有减少;
2、将文件不切块,直接存储到各个节点上的任务进程内存中(MapTask进程,JVM会开辟一块任务内存),这种模式的缓存从根本上取消了该文件的读取IO操作;

 

测试代码大数据学习交流QQ群:217770236 让我们一起学习大数据

1、直接存储到各个节点上的本地磁盘中:

在mapper中的setup中要获取到数据在本地磁盘中的路径:

(从本地磁盘读取数据到内存中缓存起来,之后在map方法的迭代中就可以直接使用)

//因为StringBuilder比StringBuffer效率更高;StringBuilder是多线程的,当不存在内容修改时,是最好的选择;
 1 package com.mmzs.bigdata.yarn.mapreduce;
 2 
 3 import java.io.BufferedReader;
 4 import java.io.File;
 5 import java.io.FileInputStream;
 6 import java.io.IOException;
 7 import java.io.InputStreamReader;
 8 import java.net.URI;
 9 import java.util.HashMap;
10 import java.util.Map;
11 
12 import org.apache.commons.io.IOUtils;
13 import org.apache.hadoop.conf.Configuration;
14 import org.apache.hadoop.fs.Path;
15 import org.apache.hadoop.io.LongWritable;
16 import org.apache.hadoop.io.Text;
17 import org.apache.hadoop.mapreduce.MRJobConfig;
18 import org.apache.hadoop.mapreduce.Mapper;
19 import org.apache.hadoop.util.StringUtils;
20 
21 public class CacheGroupMapper extends Mapper<LongWritable, Text, Text, Text> {
22     
23     private Text outKey;
24     private Text outValue;
25     private Map<String, String> smallMap;
26     
27     @Override
28     protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context)
29             throws IOException, InterruptedException {
30         outKey = new Text();
31         outValue = new Text();
32         smallMap = new HashMap<String, String>();
33         
34         Configuration conf = context.getConfiguration();
35                 
36         //从本地磁盘读取数据到内存中缓存起来,之后在map方法的迭代中就可以直接使用
37         //获取缓存到本地磁盘的路径
38 //        /*
39         String[] localPaths = conf.getStrings(MRJobConfig.CACHE_LOCALFILES);
40         Path[] path = StringUtils.stringToPath(localPaths);
41         Path localPath = path[0];
42         URI localUri = localPath.toUri();
43         File fp = new File(localUri);
44         BufferedReader br =null;
45         String line = null; 
46         try{
47             //BufferedReader类写入文件的过程;读的是group表
48             br = new BufferedReader(new InputStreamReader(new FileInputStream(fp)));
49             while (null != (line=br.readLine()) ) {
50                 String[] fields = line.split("\\s+");
51                 smallMap.put(fields[0], fields[1]);
52             }
53         }finally{
54             IOUtils.closeQuietly(br);
55         }
56 //         */
57     }
58 
59     @Override
60     protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
61             throws IOException, InterruptedException {
62         String line = value.toString();
63         String[] fields = line.split("\\s+");
64         
65         String groupId = fields[2];//user表中第三列是groupId
66         String groupInfo = smallMap.get(groupId);
67         if (null == groupInfo) {
68             return;
69         }
70         
71         outKey.set(fields[2]);//这是user表中的第三列groupId
72         //因为StringBuilder比StringBuffer效率更高;(因为map是一行一行的读取内容的不存在线程安全问题,所以使用StringBuilder是更好的选择)
73         outValue.set(new StringBuilder(groupInfo).append("\t").append(fields[0]).append("\t").append(fields[1]).toString());
74         context.write(outKey, outValue);
75     }
76     
77     @Override
78     protected void cleanup(Mapper<LongWritable, Text, Text, Text>.Context context)
79             throws IOException, InterruptedException {
80         outKey = null;
81         outValue = null;
82     }
83     
84 }
CacheGroupMapper

相关文章: