两张表链接操作(分布式缓存):
----------------------------------
假设:
其中一张A表,只有20条数据记录(比如group表)
另外一张非常大,上亿的记录数量(比如user表)
----------------------------------
策略:
将数据集小的文件直接装载到内存,然后迭代大文件记录;
分布式缓存的两种角度理解(针对较小数据集):
1、将文件不切块,直接存储到各个节点上的本地磁盘中,这种模式的缓存只是减少了网络IO,磁盘IO并没有减少;
2、将文件不切块,直接存储到各个节点上的任务进程内存中(MapTask进程,JVM会开辟一块任务内存),这种模式的缓存从根本上取消了该文件的读取IO操作;
测试准备:
首先同步时间,然后master先开启hdfs集群,再开启yarn集群;用jps查看:
master上: 先有NameNode、SecondaryNameNode;再有ResourceManager;
slave上: 先有DataNode;再有NodeManager;
如果master启动hdfs和yarn成功,但是slave节点有的不成功,则可以使用如下命令手动启动:
| hadoop-daemon.sh start datanode |
| yarn-daemon.sh start nodemanager |
然后在集群中的“/data/cacheusergroup/smalldata/”目录下上传本地文件group表,在“/data/cacheusergroup/src”目录下长传本地文件user表;内容如下:
测试目标:
1、将文件不切块,直接存储到各个节点上的本地磁盘中,这种模式的缓存只是减少了网络IO,磁盘IO并没有减少;
2、将文件不切块,直接存储到各个节点上的任务进程内存中(MapTask进程,JVM会开辟一块任务内存),这种模式的缓存从根本上取消了该文件的读取IO操作;
测试代码:大数据学习交流QQ群:217770236 让我们一起学习大数据
1、直接存储到各个节点上的本地磁盘中:
在mapper中的setup中要获取到数据在本地磁盘中的路径:
(从本地磁盘读取数据到内存中缓存起来,之后在map方法的迭代中就可以直接使用)
//因为StringBuilder比StringBuffer效率更高;StringBuilder是多线程的,当不存在内容修改时,是最好的选择;
1 package com.mmzs.bigdata.yarn.mapreduce; 2 3 import java.io.BufferedReader; 4 import java.io.File; 5 import java.io.FileInputStream; 6 import java.io.IOException; 7 import java.io.InputStreamReader; 8 import java.net.URI; 9 import java.util.HashMap; 10 import java.util.Map; 11 12 import org.apache.commons.io.IOUtils; 13 import org.apache.hadoop.conf.Configuration; 14 import org.apache.hadoop.fs.Path; 15 import org.apache.hadoop.io.LongWritable; 16 import org.apache.hadoop.io.Text; 17 import org.apache.hadoop.mapreduce.MRJobConfig; 18 import org.apache.hadoop.mapreduce.Mapper; 19 import org.apache.hadoop.util.StringUtils; 20 21 public class CacheGroupMapper extends Mapper<LongWritable, Text, Text, Text> { 22 23 private Text outKey; 24 private Text outValue; 25 private Map<String, String> smallMap; 26 27 @Override 28 protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context) 29 throws IOException, InterruptedException { 30 outKey = new Text(); 31 outValue = new Text(); 32 smallMap = new HashMap<String, String>(); 33 34 Configuration conf = context.getConfiguration(); 35 36 //从本地磁盘读取数据到内存中缓存起来,之后在map方法的迭代中就可以直接使用 37 //获取缓存到本地磁盘的路径 38 // /* 39 String[] localPaths = conf.getStrings(MRJobConfig.CACHE_LOCALFILES); 40 Path[] path = StringUtils.stringToPath(localPaths); 41 Path localPath = path[0]; 42 URI localUri = localPath.toUri(); 43 File fp = new File(localUri); 44 BufferedReader br =null; 45 String line = null; 46 try{ 47 //BufferedReader类写入文件的过程;读的是group表 48 br = new BufferedReader(new InputStreamReader(new FileInputStream(fp))); 49 while (null != (line=br.readLine()) ) { 50 String[] fields = line.split("\\s+"); 51 smallMap.put(fields[0], fields[1]); 52 } 53 }finally{ 54 IOUtils.closeQuietly(br); 55 } 56 // */ 57 } 58 59 @Override 60 protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) 61 throws IOException, InterruptedException { 62 String line = value.toString(); 63 String[] fields = line.split("\\s+"); 64 65 String groupId = fields[2];//user表中第三列是groupId 66 String groupInfo = smallMap.get(groupId); 67 if (null == groupInfo) { 68 return; 69 } 70 71 outKey.set(fields[2]);//这是user表中的第三列groupId 72 //因为StringBuilder比StringBuffer效率更高;(因为map是一行一行的读取内容的不存在线程安全问题,所以使用StringBuilder是更好的选择) 73 outValue.set(new StringBuilder(groupInfo).append("\t").append(fields[0]).append("\t").append(fields[1]).toString()); 74 context.write(outKey, outValue); 75 } 76 77 @Override 78 protected void cleanup(Mapper<LongWritable, Text, Text, Text>.Context context) 79 throws IOException, InterruptedException { 80 outKey = null; 81 outValue = null; 82 } 83 84 }