【问题标题】:Pipe command works but Mapreduce does not管道命令有效,但 Mapreduce 无效
【发布时间】:2016-02-16 19:41:17
【问题描述】:

我必须加入 6 组数据,这些数据与不同频道上某些电视节目的观看量有关。 6 组数据中的 3 组包含节目列表和每组的观看量,例如:

Show_Name 201

Another_Show 105

等等……

另外三组数据分别包含节目和播出的频道,例如:

Show_Name ABC

Another_Show CNN

等等……

我在 python 中编写了以下 Mapper 以在 ABC 频道上查找:

#!/usr/bin/env python
import sys

all_shows_views = []
shows_on_ABC = []

for line in sys.stdin:
    line       = line.strip()   #strip out carriage return (i.e. removes line breaks).
    key_value  = line.split(",")   #split line into key and value, returns a list.
    key_in     = key_value[0]     #.split(" ") - Dont need the split(" ") b/c there is no date. 
    value_in   = key_value[1]     #value is 2nd item. 

    if value_in.isdigit():
        show = key_in
    all_shows_views.append(show + "\t" + value_in)
    if value_in == "ABC":            #check if the TV Show is ABC.       
    show = key_in
           shows_on_ABC.append(show)

for i in range(len(all_shows_views)):
    show_view = all_shows_views[i].split("\t")
    for c in range(len(shows_on_ABC)):
        if show_view[0] == shows_on_ABC[c]:
            print (show_view[0] + "\t" + show_view[1])

#Note that Hadoop expects a tab to separate key value
#but this program assumes the input file has a ',' separating key value.

Mapper 只传递 ABC 上的节目名称和观看次数,例如:

Show_name_on_ABC 120

reducer,也在python中,如下:

prev_show          = "  "    #initialize previous word  to blank string
line_cnt           = 0      #count input lines.
count            = 0        #keep running total.

for line in sys.stdin:
    line       = line.strip()           #strip out carriage return
    key_value  = line.split('\t')       #split line, into key and value, returns a list
    line_cnt   = line_cnt+1   
    curr_show  = key_value[0]             #key is first item in list, indexed by 0
    value_in   = key_value[1]             #value is 2nd item

    if curr_show != prev_show and line_cnt>1:
    #print "\n"
    #print "---------------------Total---------------------"
    #print "\n"
    print (prev_show + "\t" + str(count))
    #print "\n"
    #print "------------------End of Item------------------"
    #print "\n"
    count = 0
    else:
    count = count + int(key_value[1])
        #print key_value[0] + "\t" + key_value[1]

    prev_show = curr_show  #set up previous show for the next set of input lines.

print (curr_show + "\t" + str(count))

reducer 获取 ABC 上的节目列表和观看次数,并保持每个节目的平均计数并打印出每个节目的总数(hadoop 自动根据键名按字母顺序排列数据本例中的节目)。

当我在终端中使用管道命令运行它时,如下所示:

cat Data*.text | /home/cloudera/mapper.py |sort| /home/coudera/reducer.py

我得到一个整洁的输出,正确的总数如下:

Almost_Games 49237

Almost_News 45589

Almost_Show 49186

Baked_Games 50603

当我在终端中使用 Hadoop 命令运行此问题时,使用以下命令:

> hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
   -input /user/cloudera/input \
   -output /user/cloudera/output_join \   
   -mapper /home/cloudera/mapper.py \   
   -reducer /home/cloudera/reducer.py

我得到一个不成功的错误,减速器是罪魁祸首。完整的错误如下:

15/11/15 09:16:54 INFO mapreduce.Job: Job job_1447598349691_0003 failed with state FAILED due to: Task failed task_1447598349691_0003_r_000000
Job failed as tasks failed. failedMaps:0 failedReduces:1

15/11/15 09:16:54 INFO mapreduce.Job: Counters: 37
    File System Counters
        FILE: Number of bytes read=0
        FILE: Number of bytes written=674742
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=113784
        HDFS: Number of bytes written=0
        HDFS: Number of read operations=18
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=0
    Job Counters 
        Failed reduce tasks=4
        Launched map tasks=6
        Launched reduce tasks=4
        Data-local map tasks=6
        Total time spent by all maps in occupied slots (ms)=53496
        Total time spent by all reduces in occupied slots (ms)=18565
        Total time spent by all map tasks (ms)=53496
        Total time spent by all reduce tasks (ms)=18565
        Total vcore-seconds taken by all map tasks=53496
        Total vcore-seconds taken by all reduce tasks=18565
        Total megabyte-seconds taken by all map tasks=54779904
        Total megabyte-seconds taken by all reduce tasks=19010560
    Map-Reduce Framework
        Map input records=6600
        Map output records=0
        Map output bytes=0
        Map output materialized bytes=36
        Input split bytes=729
        Combine input records=0
        Spilled Records=0
        Failed Shuffles=0
        Merged Map outputs=0
        GC time elapsed (ms)=452
        CPU time spent (ms)=4470
        Physical memory (bytes) snapshot=1628909568
        Virtual memory (bytes) snapshot=9392836608
        Total committed heap usage (bytes)=1279262720
    File Input Format Counters 
        Bytes Read=113055
15/11/15 09:16:54 ERROR streaming.StreamJob: Job not successful!
Streaming Command Failed!

为什么管道命令会起作用,而不是 hadoop 执行?

【问题讨论】:

  • 您需要查找失败的特定尝试的日志。见stackoverflow.com/questions/3207238/…
  • 我去日志发现了这个错误详情:'2015-11-18 11:00:39,934 INFO [main] org.apache.hadoop.streaming.PipeMapRed: PipeMapRed failed! java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1:' 有什么想法吗?
  • 我做了一些进一步的挖掘,错误指向减速器的最后一个打印行,说变量“curr_show”没有定义(根据管道命令,它是)。删除此行后,什么都没有,没有错误,但是没有写入输出文件?
  • 我不懂 Python,但看起来curr_show 只定义在for 循环的范围内,而print 语句在它之外?
  • 我对 Python 也比较陌生,我认为您假设变量仅在 for 循环中有效是正确的。我在循环之外声明了它,然后文件没有错误地执行。但是它正在写入的文件是空的?

标签: python hadoop mapreduce hadoop-streaming


【解决方案1】:

这个映射器和reducer 仍然不起作用。我收到以下异常。有没有人发现问题?

用于此的命令是:

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar  -input     /user/cloudera/input_join -output /user/cloudera/output_join2 -mapper '/home/cloudera/join2.mapper.py' -reducer '/home/cloudera/join2.reducer.py'

错误日志:

致命 [51645 上的 IPC 服务器处理程序 5] org.apache.hadoop.mapred.TaskAttemptListenerImpl:任务:尝试_1449644802746_0003_m_000001_0 - 退出:java.lang.RuntimeException:PipeMapRed.waitOutputThreads():子进程失败,代码为 1 在 org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:322) 在 org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:535) 在 org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130) 在 org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61) 在 org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34) 在 org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453) 在 org.apache.hadoop.mapred.MapTask.run(MapTask.java:343) 在 org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) 在 java.security.AccessController.doPrivileged(本机方法) 在 javax.security.auth.Subject.doAs(Subject.java:415) 在 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) 在 org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

【讨论】:

    【解决方案2】:

    Reducers Python 脚本正在生成错误,因为变量 curr_show 仅在读取 for 循环的行中声明。仅在使用 Hadoop 命令而不是管道命令时发生错误的原因是因为 scooping(我对此非常陌生)。

    通过在for 循环之外声明 curr_show 变量,最终的打印命令能够执行。

    prev_show          = "  "    #initialize previous word  to blank string
    line_cnt           = 0      #count input lines.
    count              = 0        #keep running total.
    curr_show          = "  "
    
    for line in sys.stdin:
        line       = line.strip()           #strip out carriage return
        key_value  = line.split('\t')       #split line, into key and value, returns a list
        line_cnt   = line_cnt+1   
        curr_show  = key_value[0]             #key is first item in list, indexed by 0
        value_in   = key_value[1]             #value is 2nd item
    
        if curr_show != prev_show and line_cnt>1:
            #print "\n"
            #print "---------------------Total---------------------"
            #print "\n"
            print (prev_show + "\t" + str(count))
            #print "\n"
            #print "------------------End of Item------------------"
            #print "\n"
            count = int(value_in)
        else:
            count = count + int(key_value[1])
            #print key_value[0] + "\t" + key_value[1]
    
        prev_show = curr_show  #set up previous show for the next set of input lines.
    
    print (curr_show + "\t" + str(count))
    

    另外,count 变量已更改为重置为当前 value_in,以便显示更改时的当前值不会丢失。

    【讨论】:

      【解决方案3】:

      您似乎没有正确使用 hadoop 流式传输命令。而不是

      hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
         -input /user/cloudera/input \
         -output /user/cloudera/output_join \   
         -mapper /home/cloudera/mapper.py \   
         -reducer /home/cloudera/reducer.py
      

      -mapper中,需要提供mapper命令。试试

      hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
         -input /user/cloudera/input \
         -output /user/cloudera/output_join \  
         -mapper "python mapper.py" \   
         -reducer "python reducer.py" \
         -file /home/cloudera/mapper.py \   
         -file /home/cloudera/reducer.py
      

      还可以通过在tracking url打开任何失败的任务来检查错误日志,因为上面的日志没有多大帮助。

      【讨论】:

      • 我在以前的工作中使用了相同的格式并且效果很好?但我会尝试建议的方法并报告。谢谢!
      猜你喜欢
      • 2016-07-05
      • 2022-01-23
      • 2016-04-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-12-21
      • 1970-01-01
      相关资源
      最近更新 更多