hadoop集群搭建中配置了mapreduce的别名是yarn
[hadoop@master01 hadoop]$ mv mapred-site.xml.template mapred-site.xml
[hadoop@master01 hadoop]$ vi mapred-site.xml
<property> <name>mapreduce.framework.name </name> <value>yarn</value> </property>
单词分类计数可以联系到sql语句的分组进行理解;
根据key设置的不同来进行计数,再传递给reduceTask按照设定的key值进行汇总;
测试准备:
首先同步时间,然后master先开启hdfs集群,再开启yarn集群;用jps查看:
master上: 先有NameNode、SecondaryNameNode;再有ResourceManager;
slave上: 先有DataNode;再有NodeManager;
如果master启动hdfs和yarn成功,但是slave节点有的不成功,则可以使用如下命令手动启动:
| hadoop-daemon.sh start datanode |
| yarn-daemon.sh start nodemanager |
在本地创建几个txt文件,并上传到集群的"/data/wordcount/src"目录下;
单词计数:
工程结构图:
代码:大数据学习交流QQ群:217770236 让我们一起学习大数据
1 package com.mmzs.bigdata.yarn.mapreduce; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.LongWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Mapper; 8 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 9 10 11 /** 12 * 这个是Mapper类,每一个Mapreduce作业必须存在Mapper类,Reduce类则是可选; 13 * Mapper类的主要作用是完成数据的筛选和过滤 14 * 15 * 自定义的Mapper类必须继承于Hadoop提供的Mapper类,并重写其中的方法完成MapTask 16 * 超类Mapper的泛型参数从左到右依次表示: 17 * 读取记录的键类型、读取记录的值类型、写出数据的键类型、写出数据的值类型 18 * 19 * Hadoop官方提供了一套基于高效网络IO传送的数据类型(如:LongWritable、Text等), 20 * 数据类型于java中原生的数据类型相对应,比如:LongWritable即为Long类型、Text即为String类型 21 * 22 * Hadoop的数据类型转换为Java类型只需要调用get方法即可(特例:Text转换为String类型调用toString) 23 * Java数据类型转换为Hadoop类型只需要使用构造方法包装即可,如: 24 * Long k = 10L; 25 * LongWritable lw = new LongWritable(k); 26 * 27 * @author hadoop 28 * 29 */ 30 public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> { 31 private Text outKey; 32 private LongWritable outValue; 33 /** 34 * 这是Mapper类的实例初始化方法,每一个MapTask对应一个Mapper实例, 35 * 每一个Mapper类被实例化之后将首先调用setup方法完成初始化操作, 36 * 对于每一个MapTask,setup方法有且仅被调用一次; 37 */ 38 @Override 39 protected void setup(Mapper<LongWritable, Text, Text, LongWritable>.Context context) 40 throws IOException, InterruptedException { 41 outKey = new Text(); 42 outValue = new LongWritable(); 43 } 44 45 46 /** 47 * 此方法在setup方法之后,cleanup方法之前调用,此方法会被调用多次,被处理的文件中的每一条记录都会调用一次该方法; 48 * 第一个参数:key 代表所读取记录相对于文件开头的起始偏移量(单位:byte) 49 * 第二个参数:value 代表所读取到的记录内容本身 50 * 第三个参数:contex 记录迭代过程的上下文 51 */ 52 @Override 53 protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) 54 throws IOException, InterruptedException { 55 56 FileSplit fp = (FileSplit) context.getInputSplit(); 57 String fileName = fp.getPath().getName(); 58 // int i = fileName.lastIndexOf("."); 59 // String fileNameSimple = fileName.substring(0, 1); 60 61 String line = value.toString(); 62 String[] words = line.split("\\s+"); 63 for (String word : words) { 64 outKey.set(fileName+":: "+word); 65 outValue.set(1); 66 context.write(outKey, outValue); 67 } 68 } 69 70 /** 71 * 这是Mapper类的实例销毁方法, 72 * 每一个Mapper类的实例将数据处理完成之后,于对象销毁之前有且仅调用一次cleanup方法 73 */ 74 @Override 75 protected void cleanup(Mapper<LongWritable, Text, Text, LongWritable>.Context context) 76 throws IOException, InterruptedException { 77 outKey = null; 78 outValue = null; 79 } 80 81 }