将user表计算后的结果分区存储
测试准备:
首先同步时间,然后master先开启hdfs集群,再开启yarn集群;用jps查看:
master上: 先有NameNode、SecondaryNameNode;再有ResourceManager;
slave上: 先有DataNode;再有NodeManager;
如果master启动hdfs和yarn成功,但是slave节点有的不成功,则可以使用如下命令手动启动:
| hadoop-daemon.sh start datanode |
| yarn-daemon.sh start nodemanager |
然后在集群的主机本地环境创建myinfo.txt;内容如下:
然后将测试文件myinfo.txt上传到集群中:
测试目标:
hadoop集群分区及缓存:
1、分区是必须要经历Shuffle过程的,没有Shuffle过程无法完成分区操作
2、分区是通过MapTask输出的key来完成的,默认的分区算法是数组求模法:
数组求模法:
将Map的输出Key调用hashcode()函数得到的哈希吗(hashcode),此哈希吗是一个数值类型,将此哈希吗数值直接与整数的最大值(Integer.MAXVALUE)取按位与(&)操作,将与操作的结果与ReducerTask
的数量取余数,将此余数作为当前Key落入的Reduce节点的索引;
-------------------------
Integer mod = (Key.hashCode()&Integer.MAXVALUE)%NumReduceTask;
被除数=34567234
NumReduceTas=3
------结果:
0、1、2 这三个数作为Reduce节点的索引;
数组求模法是有HashPartitioner类来实现的,也是MapReduce分区的默认算法;
测试代码:
1 package com.mmzs.bigdata.yarn.mapreduce; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.LongWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Mapper; 8 9 public class PartitionMapper extends Mapper<LongWritable, Text,LongWritable, Text>{ 10 private LongWritable outKey; 11 private Text outValue; 12 13 14 @Override 15 protected void setup(Mapper<LongWritable, Text, LongWritable, Text>.Context context) 16 throws IOException, InterruptedException { 17 outKey = new LongWritable(); 18 outValue= new Text(); 19 } 20 21 @Override 22 protected void cleanup(Mapper<LongWritable, Text, LongWritable, Text>.Context context) 23 throws IOException, InterruptedException { 24 outKey=null; 25 outValue=null; 26 } 27 28 @Override 29 protected void map(LongWritable key, Text value, 30 Mapper<LongWritable, Text, LongWritable, Text>.Context context) 31 throws IOException, InterruptedException { 32 String[] fields=value.toString().split("\\s+"); 33 Long userId=Long.parseLong(fields[0]); 34 outKey.set(userId); 35 outValue.set(new StringBuilder(fields[1]).append("\t").append(fields[2]).toString()); 36 context.write(outKey, outValue); 37 38 } 39 40 41 42 43 44 }