本篇我们简单学习下Hadoop如何使用MapReduce进行全局排序?
首先MapReduce其过程本身中就有排序,那么其排序原理是怎样的?我们先了解其排序原理后,再考虑是否可以加以利用从而进行全局排序。
1、mapreduce中的排序原理:
hadoop中默认按照key值进行排序的,且是按照字典顺序进行排序的。
我们知道mapreduce进程中数据的流动最简单的形式就是map>reduce,
也就是说把所有数据都丢给map阶段,然后全部又扔给1个reduce,利用hadoop其shuffle机制对所有数据进行排序,再由reduce输出结果。但是这样只有1个reduce的方案和单机是没有区别的,不能体现出分布式计算的优势。
为此,我们需要引入partitioner,将map的结果送往指定的reducer,数据流动形式为:map>partition>reduce,可以实现指定多个reducer进行分布式计算,如何分多个partition并确保partition有序我们等会会结合实践来说。
我们还可以在此基础上增加一次对reduce优化的过程,即使用combiner,(有的情况不能使用),此时数据流这样:map>combine>partition>reduce过程。
2、实践
举例:两个文件a.txt & b.txt,其中的内容分别如下:
a.txt数据:
1 map
3 map
5 map
…
99 map
b.txt数据:
0 reduce
2 reduce
…
100 reduce
要实现排序后数据如下:
0 reduce
1 map
2 reduce
3 map
…
99 map
100 reduce
也就是按照第一列进行排序。有的小伙伴就想到了,这个用本地模拟实现下,感觉很简单。
ok,试一下:
本地命令:cat a.txt b.txt | sort -k1 | head 结果如下:
并不是我们想要的结果!因为默认排序是按照字符来的。
加-n可以得到正确结果:
cat a.txt b.txt | sort -k1 -n | head 结果如下:
那么如何用mapreduce框架来实现呢?
(1)仅有1个reduce,就可以保证全局按照key排序。
import sys
base_count = 10000#这里加一个大的base,保证排序顺序正确!
for line in sys.stdin:
ss = line.strip().split('\t')
key = ss[0]
val = ss[1]
new_key = base_count + int(key)
print "%s\t%s" % (new_key, val)
import sys
base_value = 10000
for line in sys.stdin:
key, val = line.strip().split('\t')
print str(int(key) - base_value) + "\t" + val
HADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"
INPUT_FILE_PATH_A="/a.txt"
INPUT_FILE_PATH_B="/b.txt"
OUTPUT_SORT_PATH="/output_sort"
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_SORT_PATH
# Step 3.
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_A,$INPUT_FILE_PATH_B\
-output $OUTPUT_SORT_PATH \
-mapper "python map.py" \
-reducer "python red.py" \
-jobconf "mapred.reduce.tasks=1" \#设定reduce为1
-file ./map.py \
-file ./red.py \
集群运行bash run.sh:
输出结果:
最简单的方案介绍就到这里,但是这不是好的方案,只有1个reduce,完全体现不出分布式集群的优势,因此我们需要充分利用hadoop的mapreduce框架中的优点。下一篇我们再详细介绍利用partition进行全局排序的方案。
END!
谢谢阅读
欢迎关注:【大数据学习笔记】个人公众号,一起交流学习!感谢指导!