【问题标题】:Not getting my expected output in mapreduce using python code使用 python 代码在 mapreduce 中没有得到我的预期输出
【发布时间】:2019-12-09 15:28:49
【问题描述】:

运行此代码以在 Hadoop 集群中获取大约 10k+ CSV 文件中的数据的概率。 我正在使用 Google DataProc Cluster 来运行此代码。请告诉我如何获得预期的输出。最后一件事可能是逻辑问题或功能问题。

#!/usr/bin/env python3
"""mapper.py"""
import sys

# Get input lines from stdin
for line in sys.stdin:
    # Remove spaces from beginning and end of the line
    line = line.strip()

    # Split it into tokens
    #tokens = line.split()

    #Get probability_mass values
    for probability_mass in line:
        print("None\t{}".format(probability_mass))
#!/usr/bin/env python3
"""reducer.py"""
import sys
from collections import defaultdict


counts = defaultdict(int)

# Get input from stdin
for line in sys.stdin:
    #Remove spaces from beginning and end of the line
    line = line.strip()

    # skip empty lines
    if not line:
        continue  

    # parse the input from mapper.py
    k,v = line.split('\t', 1)
    counts[v] += 1

total = sum(counts.values())
probability_mass = {k:v/total for k,v in counts.items()}
print(probability_mass)

我的 CSV 文件如下所示。

probability_mass
10
10
60
10
30
Expected output Probability of each number

{10: 0.6, 60: 0.2, 30: 0.2}

but result still show like this 
{1:0} {0:0} {3:0} {6:0} {1:0} {6:0}

我会将这个命令保存在 nano 中,然后运行它。

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options=-n \
-files mapper.py,reducer.py \
-mapper "python mapper.py" \
-reducer "python reducer.py" \
-input /tmp/data.csv \
-output /tmp/output

【问题讨论】:

  • 通过为每个映射输出提供相同的键,您可以保证它们都命中同一个 reducer,从而消除了 MapReduce 的要点。您需要事先进行一项单独的工作来计算记录总数(或者省去麻烦并使用 Spark 之类的工具)。
  • @Ben Even Spark 会强制执行此操作
  • 请说明你是如何运行这段代码和你的输入文件的
  • @cricket_007 是的,但他们可能会发现以一种无法做到这一点的方式编写更容易。

标签: python python-3.x hadoop mapreduce hadoop-streaming


【解决方案1】:

您将行拆分为单个字符,这可以解释为什么您将 1、3、6、0 等作为映射键。

不要循环,只打印值的行;你的映射器不需要超过这个

import sys
for line in sys.stdin:
    print("None\t{}".format(line.strip()))

然后,在 reducer 中,您将一个 int 除以一个更大的 int,这导致向下舍入到最接近的 int,即 0。

您可以通过将字典更改为存储浮点数来解决此问题

counts = defaultdict(float)

或将总和设为浮点数

total = float(sum(counts.values()))

如前所述,这不是 Hadoop 问题,因为您可以在本地对其进行测试和调试

cat data.txt | python mapper.py | sort -n | python reducer.py

【讨论】:

  • 现在很好,我得到了概率。但是 {1\n: 0.6245, 6\n: 0.2543, 3\n: 0.2345} 这是什么\n
  • 这是一个新行字符,你没有剥离
  • 仅此 {1\n: 0.6245, 6\n: 0.2543, 3\n: 0.2345}。我期待这样 {10: 0.6245, 60: 0.2543, 30: 0.2345}
  • 我知道你想要什么。正如我所说,您不会在读取文件、获取行和将键放入字典之间的某处剥离换行符。例如,您可以使用counts[int(v.strip())] += 1,但在此之前已经有一条条,因此不需要它
猜你喜欢
  • 1970-01-01
  • 2020-09-01
  • 2022-01-22
  • 1970-01-01
  • 2017-01-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多