用途和原理

前面写MR都是使用的java,但是通常数据分析人员不一定会java,难道还要去学习下java再写MR?实际上不用,hadoop已经考虑到了这个问题,所以它提供了一个中间工具,帮助我们可以直接使用其他脚本语言比如python/ruby等语言编写MR任务,这就是hadoop streaming。

正式编写前,先看看streaming的原理。如下,hadoop streaming借助一个已经写好的MR jar来实现,包括:

  • 在每个mapper初始化时创建一个脚本进程运行脚本map任务,实际map运行时,将输入数据转成一行行数据喂给标准输入STDIN,脚本map进程读取STDIN并处理后输出一行行数据到STDOUT,实际map收集STDOUT输出数据并转成键值对输出。
  • 在每个reducer初始化时创建一个脚本进程运行脚本reduce任务,实际reduce运行时,将输入数据转成一行行数据喂给标准输入STDIN,脚本reduce进程读取STDIN并处理后输出一行行数据到STDOUT,实际reduce收集STDOUT输出数据并转成键值对输出。

工具-Hadoop Streaming

因为输入输出完全转换到标准输入输出(STDIN和STDOUT)上了,这就和使用什么语言解耦了,我们只需要按照规定方式读取和写入数据即可完成MR编写。
另外要注意一点,默认输入输出的key/value对是通过第一个Tab字符区分的,如果没有Tab,那么整行数据为Key,Value值为null。分隔符是可以修改的,如下示例:

配置 含义
“stream.map.input.field.separator=^”
“stream.num.map.input.key.fields=2”
map输入第2个^为分割符
“stream.map.output.field.separator=.”
“stream.num.map.output.key.fields=4”
map输出第4个.为分割符
“stream.reduce.input.field.separator=#”
“stream.num.reduce.input.fields=2”
reduce输入第2个#为分割符
“stream.reduce.output.field.separator=#”
“stream.num.reduce.output.fields=2”
reduce输出第2个#为分割符

程序编写

同样以WordCount为例来讲。如下,脚本map从stdin读入数据,拆分单词后写入<单词,1>键值对。

#!/usr/bin/python
# -*- coding: utf-8-*-

import sys

for line in sys.stdin:
	val = line.strip()
	words = val.split(" ")
	for w in words:
		print "%s\t%s" %(w, 1)

然后,脚本reduce将相同的key统计累加起来,如下:

#!/usr/bin/python
# -*- coding: utf-8-*-

import sys

(last_key, cnt) = (None, 0)

for line in sys.stdin:
	(key, val) = line.strip().split("\t", 1)
	if last_key and last_key!=key:
		print "%s\t%s" %(last_key, cnt)
		(last_key, cnt) = (key, int(val))
	else:
		(last_key, cnt) = (key, cnt+int(val))
	
if last_key:
	print "%s\t%s" %(last_key, cnt)

这里注意几点:

  • 和java map每次输入一条记录不同,脚本进程相当于从stdin队列读取,每次可能同时处理多条数据
  • 和java reduce自动按照key(键)组织数据不同,因为reduce接收到的数据是按照key排序的,因此脚本进程可以按照key自己来分割不同的key记录

因为脚本map、reduce完全读取标准输入输出,因此我们可以据此利用管道做本地测试,如下为word.data输入数据:

wenzhou wenwei huzhimin
wenzhou
wenwzhou
wen
wen
wenwei
wenwei wenwei wenzhou wenyongzhong
test
dada
dada dada dada wenzhou wen

运行如下命令本地测试:

cat word.data | python2.7 map.py | sort | python2.7 reduce.py 

运行结果:

dada	4
huzhimin	1
test	1
wen	3
wenwei	4
wenwzhou	1
wenyongzhong	1
wenzhou	4

实际提交任务,运行如下指令

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-D mapreduce.job.maps=1 \
-D mapreduce.job.reduces=1 \
-D mapreduce.job.name="PyWordCount" \
-input /home/wenzhou/test \
-output /home/wenzhou/testout \
-mapper "python map.py" \
-reducer "python reduce.py" \
-file ./map.py \
-file ./reduce.py

运行结果和本地测试一致。注意这里几个参数含义:

  • -D 指定MR的map和reduce个数以及job名称,也可使用-jobconf来配置参数。-D是通用的,-jobconf 只适用于streaming,使用-D时,只能将其放置在所有参数的最前面,而-jobconf可以放置在任何位置。
  • -input和-output 指定MR的输入和输出目录,MR运行前输出目录必须不存在
  • -mapper和-reducer 指定运行的脚本map和reduce进程,注意对应的执行py文件必须提前上传,或者如本例一样使用-file 上传本地文件到集群分发到各个执行机上

如果使用通用参数-files需要提前上传文件到hdfs,且通用参数要放到最前,如下:

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-files /home/wenzhou/lib/map.py#map1.py,/home/wenzhou/lib/reduce.py \
-D mapreduce.job.maps=1 \
-D mapreduce.job.reduces=1 \
-D mapreduce.job.name="PyWordCount" \
-input /home/wenzhou/test \
-output /home/wenzhou/testout \
-mapper "python map1.py" \
-reducer "python reduce.py"

默认-files在当前工作目录建一个同名的链接文件,也可以使用#来指定新的链接名称。

其他参数设置

源码和参考

源码点击下载

参考官方文档

原创,转载请注明来自

相关文章:

  • 2021-10-22
  • 2022-01-08
  • 2022-02-08
  • 2022-12-23
  • 2021-05-09
  • 2022-03-08
  • 2021-11-19
  • 2021-08-15
猜你喜欢
  • 2021-12-05
  • 2021-10-12
  • 2021-05-27
  • 2021-11-06
  • 2022-02-10
  • 2022-01-14
  • 2022-12-23
相关资源
相似解决方案