用途和原理
前面写MR都是使用的java,但是通常数据分析人员不一定会java,难道还要去学习下java再写MR?实际上不用,hadoop已经考虑到了这个问题,所以它提供了一个中间工具,帮助我们可以直接使用其他脚本语言比如python/ruby等语言编写MR任务,这就是hadoop streaming。
正式编写前,先看看streaming的原理。如下,hadoop streaming借助一个已经写好的MR jar来实现,包括:
- 在每个mapper初始化时创建一个脚本进程运行脚本map任务,实际map运行时,将输入数据转成一行行数据喂给标准输入STDIN,脚本map进程读取STDIN并处理后输出一行行数据到STDOUT,实际map收集STDOUT输出数据并转成键值对输出。
- 在每个reducer初始化时创建一个脚本进程运行脚本reduce任务,实际reduce运行时,将输入数据转成一行行数据喂给标准输入STDIN,脚本reduce进程读取STDIN并处理后输出一行行数据到STDOUT,实际reduce收集STDOUT输出数据并转成键值对输出。
因为输入输出完全转换到标准输入输出(STDIN和STDOUT)上了,这就和使用什么语言解耦了,我们只需要按照规定方式读取和写入数据即可完成MR编写。
另外要注意一点,默认输入输出的key/value对是通过第一个Tab字符区分的,如果没有Tab,那么整行数据为Key,Value值为null。分隔符是可以修改的,如下示例:
| 配置 | 含义 |
|---|---|
| “stream.map.input.field.separator=^” “stream.num.map.input.key.fields=2” |
map输入第2个^为分割符 |
| “stream.map.output.field.separator=.” “stream.num.map.output.key.fields=4” |
map输出第4个.为分割符 |
| “stream.reduce.input.field.separator=#” “stream.num.reduce.input.fields=2” |
reduce输入第2个#为分割符 |
| “stream.reduce.output.field.separator=#” “stream.num.reduce.output.fields=2” |
reduce输出第2个#为分割符 |
程序编写
同样以WordCount为例来讲。如下,脚本map从stdin读入数据,拆分单词后写入<单词,1>键值对。
#!/usr/bin/python
# -*- coding: utf-8-*-
import sys
for line in sys.stdin:
val = line.strip()
words = val.split(" ")
for w in words:
print "%s\t%s" %(w, 1)
然后,脚本reduce将相同的key统计累加起来,如下:
#!/usr/bin/python
# -*- coding: utf-8-*-
import sys
(last_key, cnt) = (None, 0)
for line in sys.stdin:
(key, val) = line.strip().split("\t", 1)
if last_key and last_key!=key:
print "%s\t%s" %(last_key, cnt)
(last_key, cnt) = (key, int(val))
else:
(last_key, cnt) = (key, cnt+int(val))
if last_key:
print "%s\t%s" %(last_key, cnt)
这里注意几点:
- 和java map每次输入一条记录不同,脚本进程相当于从stdin队列读取,每次可能同时处理多条数据
- 和java reduce自动按照key(键)组织数据不同,因为reduce接收到的数据是按照key排序的,因此脚本进程可以按照key自己来分割不同的key记录
因为脚本map、reduce完全读取标准输入输出,因此我们可以据此利用管道做本地测试,如下为word.data输入数据:
wenzhou wenwei huzhimin
wenzhou
wenwzhou
wen
wen
wenwei
wenwei wenwei wenzhou wenyongzhong
test
dada
dada dada dada wenzhou wen
运行如下命令本地测试:
cat word.data | python2.7 map.py | sort | python2.7 reduce.py
运行结果:
dada 4
huzhimin 1
test 1
wen 3
wenwei 4
wenwzhou 1
wenyongzhong 1
wenzhou 4
实际提交任务,运行如下指令
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-D mapreduce.job.maps=1 \
-D mapreduce.job.reduces=1 \
-D mapreduce.job.name="PyWordCount" \
-input /home/wenzhou/test \
-output /home/wenzhou/testout \
-mapper "python map.py" \
-reducer "python reduce.py" \
-file ./map.py \
-file ./reduce.py
运行结果和本地测试一致。注意这里几个参数含义:
- -D 指定MR的map和reduce个数以及job名称,也可使用-jobconf来配置参数。-D是通用的,-jobconf 只适用于streaming,使用-D时,只能将其放置在所有参数的最前面,而-jobconf可以放置在任何位置。
- -input和-output 指定MR的输入和输出目录,MR运行前输出目录必须不存在
- -mapper和-reducer 指定运行的脚本map和reduce进程,注意对应的执行py文件必须提前上传,或者如本例一样使用-file 上传本地文件到集群分发到各个执行机上
如果使用通用参数-files需要提前上传文件到hdfs,且通用参数要放到最前,如下:
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-files /home/wenzhou/lib/map.py#map1.py,/home/wenzhou/lib/reduce.py \
-D mapreduce.job.maps=1 \
-D mapreduce.job.reduces=1 \
-D mapreduce.job.name="PyWordCount" \
-input /home/wenzhou/test \
-output /home/wenzhou/testout \
-mapper "python map1.py" \
-reducer "python reduce.py"
默认-files在当前工作目录建一个同名的链接文件,也可以使用#来指定新的链接名称。
其他参数设置
源码和参考
源码点击下载
参考官方文档
原创,转载请注明来自