批处理引擎MapReduce程序设计
作者:尹正杰
版权声明:原创作品,谢绝转载!否则将追究法律责任。
一.MapReduce API
Hadoop同时提供了新旧两套MapReduce API,新API在就API基础上进行了封装,使得其在扩展性和易用性方面哥哥好。总结新就版本MapReduce API主要区别如下: (1)存放位置 旧版本API放在“org.apache.hadoop.mapred”包中,而新版API则放在“org.apache.hadoop.mapreduce”包及其子包中。 (2)接口类为抽象类 接口通常作为一种严格的“协议约束”,它只有方法声明但没有方法实现,且要求所有实现类(不包括抽象类)必须实现接口的每一个方法。接口的最大优点是允许一个类实现多种接口,进而实现类似C++中的“多重继承”。
抽象类则是一种比较宽松的“协议约束”,它可为某些方法提供默认实现,而继承类则可选择是否重新实现这些方法,正式因为这一点,抽象类在类衍化方面更具有优势,也就是说,抽象类具有良好的向后兼容性,当需要为抽象添加新的方法时,只要新添加的方法提供类默认实现,用户之前的代码就不必修改了。 考虑到抽象类在API衍化方面的优势,新API将InputFormat,OutputFormat,Mapper,Reducer和Partition由接口变为抽象类。 (3)上下文封装
新版本API将变量和函数封装成各种上下文(Contex)类,使得API具有更好的易用性和扩展性。首先,函数参数列表经封装后变短,使得函数更容易使用;其次,当需要修改或添加某些变量或函数时,只需要修改封装后的上下文即可,用户代码无需修改,这样保证了向后兼容性,具有良好的扩展性。
由于新版本和旧版本API在类层次结构,编程接口名称及对应的参数列表等方面存在较大差别,所以两种API不能够兼容。所以建议大家直接使用新的API进行程序开发。
二.MapReduce程序设计基础
Hadoop内核是采用Java语言开发的,提供Java API是自然而然的事情。一般而言,用户可按照以下几个步骤开发MapReduce应用程序: (1)实现Mapper,Reducer以及main函数。通过继承抽象类Mapper和Reducer实现自己的数据处理逻辑,并在main函数中创建Job,定制作业执行环境。 (2)本地调试。在本地运行应用程序,让程序读取本地数据,并写到本地,以便调试。 (3)分布式执行。将应用程序提交到Hadoop集群中,以便分布式处理HDFS中的数据。 接下来介绍介绍几个Java 程序设计实例,帮助打下理解MapReduce应用程序开发流出。
1>.构建倒排索引
倒排索引(Inverted index),也常被称为反向索引,是一种索引方法,通常用于快速全文搜索某个词语所在文档或者文档中的具体存储位置。它是文档检索系统中最常用的数据结构,也是搜索引擎中最核心大的技术之一。目前主要有两种不同的反向索引形式: (1)一条记录的水平反向索引(或者反向档案索引)包含每个引用单词的文档的列表。 (2)一个单词的水平反向索引(或者完全反向索引)又包含每个单词在一个文档中的位置。 第二种方式提供了更多的兼容性(比如短语搜索),但是需要更多的时间和空间来创建。本实力主要介绍第一种方式。
以英文为例,下面是要被索引的文本:
T0 = "I wish to wish the wish you wish to wish"
T1 = "but if you wish the wish the witch wishes"
T2 = "I won't wish the wish you wish to wish"
我们就能得到下面的反向索引:
"I" : {0,2}
"wish" : {0,1,2}
"to" : {0,2}
"the" : {0,1,2}
"you" : {0,1,2}
"but" : {1}
"if" : {1}
"witch" : {1}
"wont" : {2}
检索的条件"I","wish"和“you”将对应这个集合:{0,2} ∩ {0,1,2} ∩ {0,1,2} = {0,2}。
采用MapReduce实现倒排索引需实现三个基本组件:Mapper,Combiner和Reducer,如上图所示,具体如下: (1)Mapper Mapper过程分析输入的<key,value>对,得到倒排索引中需要的三个信息:单词,文档URI和词频(作为权重),其中,单词和文档URI为输出key,词频作为value。 (2)Combiner 统计词频,输出key为单词,输出value为文档URI和词频。 (3)Reducer 将相同key值的value值组合成倒排索引文件所需的格式。
将上面过程转化成代码,则程序框架如下所示:
下面分别介绍InvertedIndexMapper,InvertedIndexCominer和InvertedIndexReducer三个内部类的实现。
InvertedIndexMapper内部类实现如下:
InvertedIndexCombiner内部类实现如下:
InvertedIndexReducer内部类实现如下:
MapReduce应用程序设计完成后,可直接在IDE中运行,此时需设置两个本地目录作为程序的输入,分别是输入数据所在目录和输出数据存放目录。
通过本地运行确认程序程序逻辑确认后,可通过“haoop jar”命令将MapReducer作业提交到hadoop集群中,同时,“-D”指定作业运行参数,包括Map Task使用内存量,Reduce Task个数等,如下所示:
hadoop jar revertedIndex.jar java.package.name.InvertedIndex -D mapreduce.map.memory.mb=4096 -D mapreduce.map.java.opts=-Xms2560M -D mapreduce.job.reduces=4 /input/data /outout/data
2>.SQL GroupBy
给定数据表order,保存了交易数据,包括交易号dealid,用户ID,交易时间以及交易金额等,定义如下: crate table order( dealid long NOT NULL, uid long NOT NULL, dealdate date NOT NUll, amount long NOT NULL ) 交易数据量比较大,为TB级别,保存在大量文本文件中,没行保存一条交易数据,不同字段通过“,”分隔,形式如下: 000001,12054,2015-01-01,1200 000002,12090,2015-01-01,2500 000003,13000,2015-01-02,800 ...... 请问,如何编写MapReduce程序得到以下SQL产生的结果: SELECT dealid,count(distinct uid) num from order group by dealid; 一种简单的方案是,在Mapper中,将dealid和uid分别作为key和value输出,在Reducer中,借助Java中的Map数据结构设计同一dealid中不同uid数目。该方法的缺点是Reducer中内存使用量是不可控的,极有可能发生内存溢出。 另一种方案是借助MapReduce的排序功能完成uid的去重,计算过程如下图所示:
将上面过程转换成代码,则程序框架如下所示:
下面分别介绍SqlGroupByMapper,SqlGroupByPartitioner和SqlGroupByReducer三个内部类的实现。
SqlGroupByMapper内部类实现如下:
SqlGroupByPartitioner内部类实现如下:
SqlGroupByReducer内部类实现如下:
三.MapReduce程序设计进阶
MapReduce提供了很多高级功能,使用户更容易开发高效的分布式程序,这些功能包括数据压缩,多路输入/输出,组合主键以及DistributedCache等,本节将一次介绍这些功能。
1>.数据压缩
冷热数据是根据最近公司访问时间确定的,一般而言,认为最近X天内未访问过的数据为冷数据,其中X的大小视公司情况而定,比如100或者365天。可通过分析NameNode日志得到HDFS上冷数据和热数据。(这种方式是可行,但我不推荐大家使用,因为配置文件是基于XML写的,你如果一条一条去查的话,会有一种欲哭无泪的感觉)