Mapreduce streaming
用hadoop,作为Java界的小残废,只能紧紧抱住Streaming框架的大腿,Hadoop MapReduce和HDFS采用Java实现,默认提供Java编程接口,另外提供了C++编程接口和Streaming框架。Streaming框架允许任何程序语言实现的程序在Hadoop MapReduce中使用,方便已有程序向Hadoop平台移植。流处理嘛,keyvalue一行行的来,写个map程序分发出去,再写个reduce把一行行的数据处理掉,shuffle过程都由hadoop框架解决,轻松又愉快,开发效率和程序运行效率都很高,通过配置也方便资源控制,通过python接口调用的时候只比spark难用一点点。但同时Streaming默认只能处理文本数据,也不能处理多路数据输出。
通过脚本可以设置streaming的参数
streaming参数往往是mr中的难点所在,下面是参数列表:
|
-input |
输入数据路径 |
|
-output |
输出数据路径 |
|
-mapper |
mapper可执行程序或Java类 |
|
-reducer |
reducer可执行程序或Java类 |
|
-file |
分发本地文件 |
|
-cacheFile |
分发HDFS文件 |
|
-cacheArchive |
分发HDFS压缩文件 |
|
-numReduceTasks |
reduce任务个数 |
|
-jobconf | -D NAME=VALUE |
作业配置参数 |
|
-combiner |
Combiner Java类 |
|
-partitioner |
Partitioner Java类 |
|
-inputformat |
InputFormat Java类 |
|
-outputformat |
OutputFormat Java类 |
|
-inputreader |
InputReader配置 |
|
-cmdenv |
传给mapper和reducer的环境变量 |
|
-mapdebug |
mapper失败时运行的debug程序 |
|
-reducedebug |
reducer失败时运行的debug程序 |
|
-verbose |
详细输出模式 |
常见的作业配置参数是这样的
|
mapred.job.name |
作业名 |
|
mapred.job.priority |
作业优先级 |
|
mapred.job.map.capacity |
最多同时运行map任务数 |
|
mapred.job.reduce.capacity |
最多同时运行reduce任务数 |
|
hadoop.job.ugi |
作业执行权限 |
|
mapred.map.tasks |
map任务个数 |
|
mapred.reduce.tasks |
reduce任务个数 |
|
mapred.job.groups |
作业可运行的计算节点分组 |
|
mapred.task.timeout |
任务没有响应(输入输出)的最大时间 |
|
mapred.compress.map.output |
map的输出是否压缩 |
|
mapred.map.output.compression.codec |
map的输出压缩方式 |
|
mapred.output.compress |
reduce的输出是否压缩 |
|
mapred.output.compression.codec |
reduce的输出压缩方式 |
|
stream.map.output.field.separator |
map输出分隔符 |
比如使用-jobconf mapred.job.name='My Name'设置作业名,具体需要用什么配置,随时查官方文档或者百度google都可以,已经忘记自己查过多少遍了,记性不好我能怎么办!
Streaming 写起来是还好,但是背不出svm公式怎么好意思说自己会机器学习,不懂原理怎么能说会mr,所以在程序慢悠悠跑着的时候柒柒去查了下mapreduce的工作机制,如下:
Mapreduce原理
mapreduce在运行的过程中大致概括为5阶段
1. input阶段获取输入数据进行分片作为map的输入
2. map阶段过程对某种输入格式的一条记录解析成一条或多条记录
3. shffle阶段对中间数据的控制,作为reduce的输入
4. reduce阶段对相同key的数据进行合并
5. output阶段按照格式输出到指定目录
MapReduce中的Shuffle像是洗牌的逆过程,把一组无规则的数据尽量转换成一组具有一定规则的数据。Shuffle横跨Map端和Reduce端,在Map端包括Spill过程,在Reduce端包括copy和sort过程
Spill
把Kvbuffer中的数据刷到磁盘上的过程就叫Spill,如果内存中的数据满了就自动地spill到具有更大空间的磁盘。Map任务总要把输出的数据写到磁盘上,即使输出数据量很小在内存中全部能装得下,在最后也会把数据刷到磁盘上。每一次Spill过程就会最少生成一个out文件,有时还会生成index文件,Spill的次数也烙印在文件名中。Map任务如果输出数据量很大,可能会进行好几次Spill,out文件和Index文件会产生很多,分布在不同的磁盘上。需要一个merge过程(此过程按照partition和key排序)把这些文件进行合并
Copy
Reduce任务通过HTTP向各个Map任务拖取它所需要的数据。每个节点都会启动一个常驻的HTTP server,其中一项服务就是响应Reduce拖取Map数据。当有MapOutput的HTTP请求过来的时候,HTTP server就读取相应的Map输出文件中对应这个Reduce部分的数据通过网络流输出给Reduce。如果在内存中不能放得下这个Map的数据的话,直接把Map数据写到磁盘上,在本地目录创建一个文件,从HTTP流中读取数据然后写到磁盘,使用的缓存区大小是64K。拖一个Map数据过来就会创建一个文件,当文件数量达到一定阈值时,开始启动磁盘文件merge,把这些文件合并输出到一个文件。
Sort
Map的输出数据已经是有序的,Merge进行一次合并排序,所谓Reduce端的sort过程就是这个合并的过程。一般Reduce是一边copy一边sort,即copy和sort两个阶段是重叠而不是完全分开的。
经过sort之后reduce通过sort的数据处理keyvalue,将处理结果写入在配置文件中设定的磁盘上,整个流程就结束啦~结束啦
就柒柒用过的spark和mr而言感觉差异并不是很大,存储都基于hadfs,都是流式处理数据,都需要在配置脚本中搞好配置,当你的程序出问题的时候,八成就在配置中。