【问题标题】:Facing issue in Mapper.py and Reducer.py when running code in Hadoop cluster在 Hadoop 集群中运行代码时,Mapper.py 和 Reducer.py 面临问题
【发布时间】:2019-12-02 13:33:01
【问题描述】:

运行此代码以在 Hadoop 集群中获取 CSV 文件中的数据。

当我在集群中运行此代码时收到此错误“java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1”任何人修复我的代码。

#!/usr/bin/env python3
"""mapper.py"""
import sys

# Get input lines from stdin
for line in sys.stdin:
    # Remove spaces from beginning and end of the line
    line = line.strip()

    # Split it into tokens
    #tokens = line.split()

    #Get probability_mass values
    for probability_mass in line:
        print(str(probability_mass)+ '\t1')
#!/usr/bin/env python3
"""reducer.py"""
import sys
from collections import defaultdict


counts = defaultdict(int)

# Get input from stdin
for line in sys.stdin:
    #Remove spaces from beginning and end of the line
    line = line.strip()

    # skip empty lines
    if not line:
        continue  

    # parse the input from mapper.py
    k,v = line.split('\t', 1)
    counts[v] += 1

total = sum(counts.values())
probability_mass = {k:v/total for k,v in counts.items()}
print(probability_mass)
marks
10
10
60
10
30
Expected output Probability of each number

{10: 0.6, 60: 0.2, 30: 0.2}

but result still show like this 
{1:1} {1:1} {1:1} {1:1} {1:1} {1:1}

【问题讨论】:

    标签: python python-3.x hadoop mapreduce hadoop-streaming


    【解决方案1】:

    真正的错误应该在在 YARN UI 中可用,但是将概率作为键将不允许您一次对所有值求和,因为它们最终都会出现在不同的减速器中

    如果您没有将值分组的键,那么您可以使用它,它将所有数据集中到一个 reducer

    print('%s\t%s' % (None, probability_mass))

    这是您想要的输出的工作示例,我仅使用输入文件进行了测试,而不是在 Hadoop 中

    import sys
    from collections import defaultdict
    
    counts = defaultdict(int)
    
    # Get input from stdin
    for line in sys.stdin:
        #Remove spaces from beginning and end of the line
        line = line.strip()
    
        # skip empty lines
        if not line:
            continue  
    
        # parse the input from mapper.py
        k,v = line.split('\t', 1)
        counts[v] += 1
    
    total = float(sum(counts.values()))
    probability_mass = {k:v/total for k,v in counts.items()}
    print(probability_mass)
    

    输出

    {'10': 0.6, '60': 0.2, '30': 0.2}
    

    您可以使用 cat file.txt | python mapper.py | sort -u | python reducer.py 在不使用 Hadoop 的情况下测试您的代码

    另外,mrjob 或 pyspark 是更高级的语言,可以提供更多有用的功能

    【讨论】:

    • 怎么样? reducer 中的ClassA 将始终分配给该映射器的最后一个值
    • 这是我的错误。最后会是 print ('%s\t%s') % (probability_mass, Classprob[probability_mass]) in reducer
    • mapreduce 中不能有标题。您应该在问题中包含您的示例数据集(前 20 行)和预期输出
    • 我已附上数据集和预期输出。
    • 注意:您的问题不适用于 mapreduce,因为您必须提前知道所有值的总和。因此,您必须从映射器输出(None, ClassA)。而且您还没有发布来自 YARN UI 的实际错误。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-01-21
    • 1970-01-01
    • 2022-07-04
    • 2012-07-09
    • 2017-02-07
    • 2020-08-09
    • 2021-04-11
    相关资源
    最近更新 更多