将user表计算后的结果分区存储

 

测试准备:

首先同步时间,然后master先开启hdfs集群,再开启yarn集群;用jps查看:

master上: 先有NameNode、SecondaryNameNode;再有ResourceManager;

slave上:   先有DataNode;再有NodeManager;

如果master启动hdfs和yarn成功,但是slave节点有的不成功,则可以使用如下命令手动启动: 

hadoop-daemon.sh start datanode
yarn-daemon.sh start nodemanager

然后在集群的主机本地环境创建myinfo.txt;内容如下:

YARN集群的mapreduce测试(五)

然后将测试文件myinfo.txt上传到集群中:

YARN集群的mapreduce测试(五)

YARN集群的mapreduce测试(五)

 

测试目标:

hadoop集群分区及缓存:
1、分区是必须要经历Shuffle过程的,没有Shuffle过程无法完成分区操作
2、分区是通过MapTask输出的key来完成的,默认的分区算法是数组求模法:

数组求模法:
将Map的输出Key调用hashcode()函数得到的哈希吗(hashcode),此哈希吗是一个数值类型,将此哈希吗数值直接与整数的最大值(Integer.MAXVALUE)取按位与(&)操作,将与操作的结果与ReducerTask
的数量取余数,将此余数作为当前Key落入的Reduce节点的索引;
-------------------------
Integer mod = (Key.hashCode()&Integer.MAXVALUE)%NumReduceTask;
被除数=34567234
NumReduceTas=3
------结果:
0、1、2 这三个数作为Reduce节点的索引;
数组求模法是有HashPartitioner类来实现的,也是MapReduce分区的默认算法;

 

测试代码

 1 package com.mmzs.bigdata.yarn.mapreduce;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.io.LongWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Mapper;
 8 
 9 public class PartitionMapper extends Mapper<LongWritable, Text,LongWritable, Text>{
10     private LongWritable outKey;
11     private Text outValue;
12     
13 
14     @Override
15     protected void setup(Mapper<LongWritable, Text, LongWritable, Text>.Context context)
16             throws IOException, InterruptedException {
17         outKey = new LongWritable();
18         outValue= new Text();
19     }
20 
21     @Override
22     protected void cleanup(Mapper<LongWritable, Text, LongWritable, Text>.Context context)
23             throws IOException, InterruptedException {
24         outKey=null;
25         outValue=null;
26     }
27 
28     @Override
29     protected void map(LongWritable key, Text value,
30             Mapper<LongWritable, Text, LongWritable, Text>.Context context)
31             throws IOException, InterruptedException {
32         String[] fields=value.toString().split("\\s+");
33         Long userId=Long.parseLong(fields[0]);
34         outKey.set(userId);
35         outValue.set(new StringBuilder(fields[1]).append("\t").append(fields[2]).toString());
36         context.write(outKey, outValue);
37         
38     }
39 
40     
41     
42     
43 
44 }
PartitionMapper

相关文章: