本篇我们简单学习下Hadoop如何使用MapReduce进行全局排序?
首先MapReduce其过程本身中就有排序,那么其排序原理是怎样的?我们先了解其排序原理后,再考虑是否可以加以利用从而进行全局排序。
1、mapreduce中的排序原理:
hadoop中默认按照key值进行排序的,且是按照字典顺序进行排序的。
我们知道mapreduce进程中数据的流动最简单的形式就是map>reduce,
也就是说把所有数据都丢给map阶段,然后全部又扔给1个reduce,利用hadoop其shuffle机制对所有数据进行排序,再由reduce输出结果。但是这样只有1个reduce的方案和单机是没有区别的,不能体现出分布式计算的优势。
为此,我们需要引入partitioner,将map的结果送往指定的reducer,数据流动形式为:map>partition>reduce,可以实现指定多个reducer进行分布式计算,如何分多个partition并确保partition有序我们等会会结合实践来说。
我们还可以在此基础上增加一次对reduce优化的过程,即使用combiner,(有的情况不能使用),此时数据流这样:map>combine>partition>reduce过程。

2、实践

举例:两个文件a.txt & b.txt,其中的内容分别如下:

a.txt数据:

1 map

3 map

5 map

99 map

b.txt数据:

0 reduce

2 reduce

100 reduce

要实现排序后数据如下:

0 reduce

1 map

2 reduce

3 map

99 map

100 reduce

也就是按照第一列进行排序。有的小伙伴就想到了,这个用本地模拟实现下,感觉很简单。

ok,试一下:

本地命令:cat a.txt b.txt | sort -k1 | head 结果如下:
No.8大数据入门 | MR实践之--全局排序一
并不是我们想要的结果!因为默认排序是按照字符来的。

加-n可以得到正确结果:

cat a.txt b.txt | sort -k1 -n | head 结果如下:
No.8大数据入门 | MR实践之--全局排序一

那么如何用mapreduce框架来实现呢?

(1)仅有1个reduce,就可以保证全局按照key排序。

map.py:

import sys

base_count = 10000#这里加一个大的base,保证排序顺序正确!

for line in sys.stdin:

    ss = line.strip().split('\t')

    key = ss[0]

    val = ss[1]

    new_key = base_count + int(key)

    print "%s\t%s" % (new_key, val)

red.py:

import sys

base_value = 10000

for line in sys.stdin:

    key, val = line.strip().split('\t')

    print str(int(key) - base_value) + "\t" + val

run.sh:

 HADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop"
    
    STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"
    
    INPUT_FILE_PATH_A="/a.txt"
    
    INPUT_FILE_PATH_B="/b.txt"
    
    OUTPUT_SORT_PATH="/output_sort"
    
    $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_SORT_PATH
    
    # Step 3.
    
    $HADOOP_CMD jar $STREAM_JAR_PATH \
    
        -input $INPUT_FILE_PATH_A,$INPUT_FILE_PATH_B\
    
        -output $OUTPUT_SORT_PATH \
    
        -mapper "python map.py" \
    
        -reducer "python red.py" \
    
        -jobconf "mapred.reduce.tasks=1" \#设定reduce为1
    
        -file ./map.py \
    
        -file ./red.py \

集群运行bash run.sh:

输出结果:
No.8大数据入门 | MR实践之--全局排序一
最简单的方案介绍就到这里,但是这不是好的方案,只有1个reduce,完全体现不出分布式集群的优势,因此我们需要充分利用hadoop的mapreduce框架中的优点。下一篇我们再详细介绍利用partition进行全局排序的方案。
END!
谢谢阅读
欢迎关注:【大数据学习笔记】个人公众号,一起交流学习!感谢指导!
No.8大数据入门 | MR实践之--全局排序一

相关文章: