【问题标题】:Combine output files of MapReduce job合并 MapReduce 作业的输出文件
【发布时间】:2013-12-14 08:21:09
【问题描述】:

我用 Python 编写了一个 Mapper 和 Reducer,并使用 Hadoop Streaming 在 Amazon 的 Elastic MapReduce(EMR) 上成功执行了它。

最终结果文件夹包含三个不同文件 part-00000、part-00001 和 part-00002 中的输出。但我需要将输出作为一个文件。有什么办法可以做到吗?

这是我的 Mapper 代码:

#!/usr/bin/env python

import sys

for line in sys.stdin:
    line = line.strip()
    words = line.split()
    for word in words:
        print '%s\t%s' % (word, 1)

这是我的 Reducer 代码

#!/usr/bin/env python

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None
max_count=0

for line in sys.stdin:
    line = line.strip()
    word, count = line.split('\t', 1)

    try:
        count = int(count)
    except ValueError:
        continue

if current_word == word:
    current_count += count
else:
    if current_word:
        # write result to STDOUT
            if current_word[0] != '@':
                print '%s\t%d' % (current_word, current_count)
                if count > max_count:
                    max_count = count
    current_count = count
    current_word = word

if current_word == word:
    print '%s\t%s' % (current_word, current_count)

我需要将其输出为一个文件。

【问题讨论】:

  • 你不能直接打开这三个文件并将它们连接成一个输出文件吗?
  • 这就是我一直在做的。但如果我能在 Reduce 阶段之后得到一个输出文件,我会很高兴。
  • 你就不能这样做吗(Linux/UNIX):cat part-00000 part-00001 part-00002 > output?
  • 谢谢詹姆斯。这是一种方式。但是我不能让 EMR 本身将它作为一个单独的部分文件吐出来?

标签: python hadoop mapreduce hadoop-streaming elastic-map-reduce


【解决方案1】:

一种非常简单的方法(假设是 Linux/UNIX 系统):

$ cat part-00000 part-00001 part-00002 > output

【讨论】:

  • 或者更好,cat part-* > output
  • 我认为最好更明确一点,不要假设它们都被称为part-* :)
【解决方案2】:

对小型数据集/处理使用 single reduce 或在作业的输出文件上使用 getmerge 选项。

【讨论】:

  • 但作业本身在 Elastic MapReduce 上运行。然后呢?
  • EMR 与否,将支持响应中提到的选项。
【解决方案3】:

我对上述问题的解决方法是执行以下 hdfs 命令:

hadoop fs -getmerge /hdfs/path local_file

其中 /hdfs/path 是包含作业输出的所有部分(部分-*****)的路径。 hadoop fs 的 -getmerge 选项会将所有作业输出合并到我们本地文件系统上的单个文件中。

【讨论】:

    【解决方案4】:

    我最近遇到了同样的问题,实际上组合器应该完成这项任务,但我无法以某种方式实现。我所做的是;

    1. 第一步:mapper1.py reducer1.py

      输入:s3://../data/

      输出 s3://..../small_output/

    2. 第二步:mapper2.py reducer2.py

      输入 s3://../data/

      输出:s3://..../output2/

    3. 第三步:mapper3.py reducer3.py

      输入:s3://../output2/

      输出:s3://..../final_output/

    我假设我们需要将 step1 的输出作为 step3 的单个文件。

    在mapper2.py的顶部,有这段代码;

    if not os.path.isfile('/tmp/s3_sync_flag'):
        os.system('touch /tmp/s3_sync_flag')
        [download files to /tmp/output/]
        os.system('cat /tmp/output/part* > /tmp/output/all')
    

    如果阻塞,检查多个映射器执行。

    【讨论】:

      猜你喜欢
      • 2016-03-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-01-25
      • 1970-01-01
      • 1970-01-01
      • 2016-02-16
      相关资源
      最近更新 更多