主要内容:mapreduce整体工作机制介绍;wordcont的编写(map逻辑 和 reduce逻辑)与提交集群运行;调度平台yarn的快速理解以及yarn集群的安装与启动。
回顾第HDFS第一天单词统计实例(HDFS版wordcount):
统计HDFS的/wordcount/input/a.txt文件中的每个单词出现的次数——wordcount
但是,进一步思考:如果文件又多又大,用上面那个程序有什么弊端?
慢!因为只有一台机器在进行运算处理
从这个简单的案例中我们收到一些启发:
1、可以在任何地方运行程序,访问HDFS上的文件并进行统计运算,并且可以把统计的结果写回HDFS的结果文件中;
2、如何变得更快?核心思想:让我们的运算程序并行在多台机器上执行!
3、面向接口编程,整个程序的主框架是通用的,使用业务接口编程,主流程形成通用的框架,写好之后不需要修改,我们只要按照业务接口,提供具体的业务实现类,即可完成具体的业务操作。
幸运的是,hadoop中已经为我们提供了分布式计算的解决方案,就是mapreduce计算框架,用来在分布式环境下处理数据。
下图是mapreduce整体工作机制的简要介绍,后续还会有详细的介绍。
mapreduce与我们之前自己写个hdfs版本的wordcount一样,都是运算程序,而且可以在分布式环境下,并行运行,他的主处理流程同我们的wordcount主处理流程一样都是成形的计算框架设定好之后就不会改变了,框架中用到的统一的业务接口和业务方法(业务接口数据处理逻辑)需要我们实现并提供给框架。框架按照接口规定方法中设定好的参数要求,想特定类型和形式的数据传给用户提供的实现类,用户接受框架传入的参数,进行业务处理并将处理结果写入接口中规定的数据缓存对象中。
mapreduce程序分俩个阶段,整个运算流程分为map阶段和reduce阶段,分别有mapTask和reduceTask程序来实现。mapTask和reduceTask的大致工作机制如下。
mapTask可以在很多机器上运行,具体运行多少个mapTask要看要处理的数据总量有多少,这个过程由程序自动计算,无需我们担心,计算好之后每个mapTask都会分到自己要处理的数据范围,术语叫做数据切片;一般来讲是这么计算的,赔了保证每个task处理的数据大体差不多,程序会将hdfs中的待处理的文件进行切片划分,默认一个切片(一个maptask要处理的数据范围)128M大小。假如要处理的文件有:a.txt(200m)b.txt(500m)c.txt(100m)d.txt(120m)如果一个task程序负责一个文件,显然是不公平不合理的,其实hadoop会按照128m大小为一个单位,对数据进行切片操作:a->2,b->4,c->1,d->1,总数据一共被切分成8个切片或者说8个任务,一个mapTask就处理一个切片或者说任务,一共需要8个maptask,那么就分配8个mapTask,这样每个mapTask就明确了自己的任务(所有task的处理逻辑都一样,都是上面提到的用户提供的业务实现类,只不过是处理的数据范围不同)和要处理的数据范围,接下来就是启动一批mapTask进行作业,当然如果文件很多很大,会需要很多的mapTask,至于一次启动多少个task以及一台物理机器会运行多少个mapTask,这和你的集群规模以及运行配置有关。mapTask就是一个程序,一台机子上可以启动过个mapTask,如果你你的集群只有两台机器负责mapTask运算,理论上每台机器会分启动4个maptask任务,但是若果机器性能有限,一次最多只能负载3个mapTask,也没关系,只不过是先运行一批mapTask(3+3=6个)每个task都有自己的任务只执行自己分陪到的任务,运行结束后在启动剩余的2个mapTask。所以不用担心机器不够用,既然任务分的很明确,可以每次运行一批mapTask,分批完成全部的。
maptask启动后会干什么,这个过程已经在mapreduce中写死了,每个mapTask会分到部分数据,然后一行一行的去读数据,每读一行数据,进行一次处理,具体的处理逻辑有用户提供的接口实现类来完成(需要用户提供具体的业务实现类,并且以某种方式通知mapreduce框架去调用哪一个实现类,可以通过配置文件或者参数的形式;mapTask将读到的数据作为参数传给业务方法,业务方法将处理的结果传给mapTask)。
那么这样还有一个问题,每个mapTask的处理数据范围和结果都只是整个数据的一个局部,并非全局结果,如何得到全局结果,这就需要mapreduce的第二个阶段,reduce阶段进行局部数据汇总统计。
reduce阶段有reduceTask程序来实现,可以在很多他机器上并行运行。reduceTask数量与mapTask数量没有关系,reduceTask要整理mapTask产生的数据,就需要统一大家的数据形式,这里统一为key :value键值对的形式。mapTask产生的key:value需要传递给reduceTask,而且核心思想是同时要确保,相同key的数据必须传递给同一个reduceTask,这就需要mapTask和reduceTask之间的数据分发机制,shuffle机制:可以相同的key:value数据一定发给同一个reduceTask程序。
reduce Task 聚合操作具体做什么,聚合操作就是对key相同的一组数据进行处理,具体的聚合逻辑通过接口的方式暴露给用户,由用户来指定(同mapTask方式)。
reduce Task处理结果,reduce Task将最后的聚合结果写入hdfs中,每个reduce Task最终形成一个文件,文件名称默认是part-r+reduceTask的编号。
总结:
map阶段,我们只需要提供具体的业务类,对mapTask读到的一行数据进行处理
reduce阶段,仍然需要我们提供具体的逻辑,对reduce拿到的一组相同key的kv数据进行处理
处理结果的传递:无论是map阶段还是recude阶段,数据处理结果的后续流程无需我们关系,我们只需要将各个阶段的数据都交给人家提供好的context对象就好;map阶段会将数据存着,将来想方设法地将数据结果传递给reduceTask,而且保证同一个key只给同一个reduce,reduce阶段会将数据写入hdfs,只要有一个结果key:value,就会往文件中追加一行。
2、wordcount示例
maptask每次度一行数据都会将数据作为参数传递给我们提给的业务接口实现类中的map方法(map(long l,String v ,context)map方法中的参数分别为,该行行首地址的偏移量,该行的数据,缓存对象),在wordcont程序中,map每次拿到maptask传递过来一行数据,首先将文本数据切分,形成单词数据,直接将(word,1)形式的数据写入context中,以便将来给reduce(context怎么缓存,后续会介绍),为什么里不做统计呢,将单行相同的单词统计一下?单行数据统计之后,任然是单行的结果,最后还得在recue中统计,避免无意义的中将统计,我们最终只在reduce中进行统计。maptask通过shuffle机制将(word,1)形式的数据发给,reduce同时保证相同的key只发往同一个reduce,这些发过来的相同key的一组数据在reduce这边落地成文件;文件中的参数如何给reduceTask的处理逻辑中共的reduce方法(reduce(k,value迭代器,context)参数分别是:一组数据的key,改组数据中的key都相同,任意一个都可以;value迭代器,可以不断的取出下一个值,context对象)。每一组(相同key)数据调用一次reduce逻辑。
2.1、 wordcount程序整体运行流程示意图
map阶段: 将每一行文本数据变成<单词,1>这样的kv数据
reduce阶段:将相同单词的一组kv数据进行聚合:累加所有的v
注意点:mapreduce程序中,
map阶段的进、出数据,
reduce阶段的进、出数据,
类型都应该是实现了HADOOP序列化框架的类型,如:
String对应Text
Integer对应IntWritable
Long对应LongWritable
2.2、 编码实现
WordcountMapper类开发
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * KEYIN :是map task读取到的数据的key的类型,是一行的起始偏移量Long * VALUEIN:是map task读取到的数据的value的类型,是一行的内容String * * KEYOUT:是用户的自定义map方法要返回的结果kv数据的key的类型,在wordcount逻辑中,我们需要返回的是单词String * VALUEOUT:是用户的自定义map方法要返回的结果kv数据的value的类型,在wordcount逻辑中,我们需要返回的是整数Integer * * * 但是,在mapreduce中,map产生的数据需要传输给reduce,需要进行序列化和反序列化,而jdk中的原生序列化机制产生的数据量比较冗余,就会导致数据在mapreduce运行过程中传输效率低下 * 所以,hadoop专门设计了自己的序列化机制,那么,mapreduce中传输的数据类型就必须实现hadoop自己的序列化接口 * * hadoop为jdk中的常用基本类型Long String Integer Float等数据类型封住了自己的实现了hadoop序列化接口的类型:LongWritable,Text,IntWritable,FloatWritable * * * * * @author ThinkPad * */ public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 切单词 String line = value.toString(); String[] words = line.split(" "); for(String word:words){ context.write(new Text(word), new IntWritable(1)); } } }