目录
ODPS提供了MapReduce编程接口。用户可以使用MapReduce提供的接口(Java API)编写MapReduce程序处理ODPS的中的数据。本章节只对MapReduce SDK的使用方法作简单的介绍,关于MapReduce SDK的详细说明请参考官方提供的Java Doc。https://help.aliyun.com/product/27797.html?spm=a2c4g.11186623.6.540.301278a8vYxpDB
应用场景
MapReduce最早是由Google提出的分布式数据处理模型,随后受到了业内的广泛关注,并被大量应用到各种商业场景中。比如:
- 搜索:网页爬取、倒排索引、PageRank。
- Web访问日志分析:分析和挖掘用户在web上的访问、购物行为特征,实现个性化推荐;分析用户访问行为。
- 文本统计分析:比如莫言小说的WordCount、词频TFIDF分析;学术论文、专利文献的引用分析和统计;维基百科数据分析等。
- 海量数据挖掘:非结构化数据、时空数据、图像数据的挖掘。 机器学习:监督学习、无监督学习、分类算法如决策树、SVM等。
- 自然语言处理:基于大数据的训练和预测;基于语料库构建单词同现矩阵,频繁项集数据挖掘、重复文档检测等。
- 广告推荐:用户点击(CTR)和购买行为(CVR)预测。
处理流程
MapReduce处理数据过程主要分成2个阶段:Map阶段和Reduce阶段。首先执行Map阶段,再执行Reduce阶段。Map和Reduce的处理逻辑由用户自定义实现,但要符合MapReduce框架的约定。
- 在正式执行Map前,需要将输入数据进行”分片”。所谓分片,就是将输入数据切分为大小相等的数据块,每一块作为单个Map Worker的输入被处理,以便于多个Map Worker同时工作。
- 分片完毕后,多个Map Worker就可以同时工作了。每个Map Worker在读入各自的数据后,进行计算处理,最终输出给Reduce。Map Worker在输出数据时,需要为每一条输出数据指定一个Key。这个Key值决定了这条数据将会被发送给哪一个Reduce Worker。Key值和Reduce Worker是多对一的关系,具有相同Key的数据会被发送给同一个Reduce Worker,单个Reduce Worker有可能会接收到多个Key值的数据。
- 在进入Reduce阶段之前,MapReduce框架会对数据按照Key值排序,使得具有相同Key的数据彼此相邻。如果用户指定了”合并操作”(Combiner),框架会调用Combiner,将具有相同Key的数据进行聚合。Combiner的逻辑可以由用户自定义实现。与经典的MapReduce框架协议不同,在ODPS中,Combiner的输入、输出的参数必须与Reduce保持一致。这部分的处理通常也叫做”洗牌”(Shuffle)。
- 接下来进入Reduce阶段。相同的Key的数据会到达同一个Reduce Worker。同一个Reduce Worker会接收来自多个Map Worker的数据。每个Reduce Worker会对Key相同的多个数据进行Reduce操作。最后,一个Key的多条数据经过Reduce的作用后,将变成了一个值。
备注:
上文仅是对MapReduce框架做简单介绍,更多相关信息请查阅其他资料
下面将以WordCount为例,解释ODPS MapReduce各个阶段的概念。 假设存在一个文本a.txt,文本内每行是一个数字,我们要统计每个数字出现的次数。文本内的数字称为Word,数字出现的次数称为Count。如果ODPS Mapreduce完成这一功能,需要经历下图描述的几个步骤:
- 首先对文本进行分片,将每片内的数据作为单个Map Worker的输入;
- Map处理输入,每获取一个数字,将数字的Count设置为1,并将此<Word, Count>对输出,此时以Word作为输出数据的Key;
- 在Shuffle阶段前期,首先对每个Map Worker的输出,按照Key值,即Word值排序。排序后进行Combine操作,即将Key值(Word值)相同的Count累加,构成一个新的<Word, Count>对。此过程被称为合并排序;
- 在Shuffle阶段后期,数据被发送到Reduce端。Reduce Worker收到数据后依赖Key值再次对数据排序;
- 每个Reduce Worker对数据进行处理时,采用与Combiner相同的逻辑,将Key值(Word值)相同的Count累加,得到输出结果;
备注:
由于ODPS的所有数据都被存放在表中,因此ODPS MapReduce的输入、输出只能是表,不允许用户自定义输出格式,不提供类似文件系统的接口。
传统的MapReduce模型要求每一轮MapReduce操作之后,数据必须落地到分布式文件系统上(比如HDFS或 MaxCompute 表)。而一般的MapReduce应用通常由多个MapReduce作业组成,每个作业结束之后需要写入磁盘,接下去的Map任务很多情况下只是读一遍数据,为后续的Shuffle阶段做准备,这样其实造成了冗余的IO操作。
扩展MapReduce
MaxCompute 的计算调度逻辑可以支持更复杂编程模型, 针对上面的那种情况,可以在Reduce后面直接执行下一次的Reduce操作,而不需要中间插入一个Map操作。基于此,MaxCompute 提供了扩展的MapReduce模型,即可以支持Map后连接任意多个Reduce操作,比如Map->Reduce->Reduce。
Hadoop Chain Mappper/Reducer也支持类似的串行化Map或Reduce操作,但和MaxCompute的扩展MapReduce(MR2)模型有本质的区别,因为Chain Mapper/Reducer还是基于传统的MapReduce模型,只是可以在原有的Mapper或Reducer后面在增加一个或多个Mapper操作(不允许增加Reducer)。这带来的好处是用户可以复用之前的Mapper业务逻辑,可以把一个Map或Reduce拆成多个Mapper阶段,但本质上并没有改变底层的调度和I/O模型。
与 MaxCompute MapReduce相比,MR2在Map/Reducedeng 等函数编写方式上基本一致。较大的不同点发生在作业时。更多详细信息可参考扩展MapReduce示例。
算了不写了,官方文档地址如下:
MaxCompute > 开发 > MapReduce > 概要 > MapReduce概述