【问题标题】:Hadoop Reducer does not workHadoop 减速器不工作
【发布时间】:2017-07-03 15:01:36
【问题描述】:

我在使用 MapReduce 作业时遇到问题。我的map 函数确实运行并产生了所需的输出。但是,reduce 函数不会运行。似乎该函数永远不会被调用。我使用文本作为键和文本作为值。但我不认为这会导致问题。

输入文件格式如下:

2015-06-06,2015-06-06,40.80239868164062,-73.93379211425781,40.72591781616211,-73.98358154296875,7.71,35.72
2015-06-06,2015-06-06,40.71020126342773,-73.96302032470703,40.72967529296875,-74.00226593017578,3.11,2.19
2015-06-05,2015-06-05,40.68404388427734,-73.97597503662109,40.67932510375977,-73.95581817626953,1.13,1.29
...

我想将一行的第二个日期提取为Text 并将其用作reduce 的键。键的值将是同一行中最后两个 float 值的组合。
即:2015-06-06 7.71 35.72 2015-06-06 9.71 66.72
这样可以将值部分视为由空格分隔的两列。
这实际上有效,我得到一个包含许多相同键但不同值的输出文件。

现在我想总结每个键的两个浮点列,以便在减少之后我得到一个日期作为键,而总和的列作为值。

问题:reduce 不运行。

请看下面的代码:

映射器

public class Aggregate {

public static class EarnDistMapper extends Mapper<Object, Text, Text, Text> {

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

        String [] splitResult = value.toString().split(",");
        String dropOffDate = "";
        String compEarningDist = "";
        //dropoffDate at pos 1 as key 
        dropOffDate = splitResult[1];
        //distance at pos length-2 and earnings at pos length-1 as values separated by space
        compEarningDist = splitResult[splitResult.length -2] + " " + splitResult[splitResult.length-1];

        context.write(new Text(dropOffDate), new Text(compEarningDist));
    }
}

减速器

public static class EarnDistReducer extends Reducer<Text,Text,Text,Text> {

    public void reduce(Text key, Iterator<Text> values, Context context) throws IOException, InterruptedException {

         float sumDistance = 0;
         float sumEarnings = 0;
         String[] splitArray; 

         while (values.hasNext()){
             splitArray = values.next().toString().split("\\s+");
             //distance first
             sumDistance += Float.parseFloat(splitArray[0]);
             sumEarnings += Float.parseFloat(splitArray[1]);
         }

         //combine result to text

         context.write(key, new Text(Float.toString(sumDistance) + " " + Float.toString(sumEarnings)));
    }
}

工作

public static void main(String[] args) throws Exception{
    // TODO
    Configuration conf = new Configuration();

    Job job = Job.getInstance(conf, "Taxi dropoff");
    job.setJarByClass(Aggregate.class);
    job.setMapperClass(EarnDistMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setCombinerClass(EarnDistReducer.class);
    job.setReducerClass(EarnDistReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

感谢您的帮助!!

【问题讨论】:

  • 当你开始工作时,你会得到什么输出?
  • 输出是2015-06-06 7.71 35.72 2015-06-06 9.71 66.72 加上更多的行。但我从来没有看到汇总的结果。如果我将system.out.println 放在reduce 中,则控制台上没有输出。
  • 你的程序的结构是什么? MapperReducer 类是 Aggregate 的内部类?都在同一个文件中吗?您的输出文件的名称是什么?是part-r-00000 还是part-m-00000
  • @philantrovert 是的,Aggregate 类包装了其他 MapperReducer 类。所以它们是Aggregate 的内部类,所有内容都包含在一个文件中。文件名是part-r-00000

标签: java hadoop mapreduce


【解决方案1】:

reduce 方法的签名错误。你有:

public void reduce(Text key, Iterator<Text> values, Context context) {

应该是:

public void reduce(Text key, Iterable<Text> values, Context context) {

【讨论】:

    猜你喜欢
    • 2014-05-21
    • 1970-01-01
    • 2016-07-02
    • 2016-08-13
    • 1970-01-01
    • 1970-01-01
    • 2018-03-09
    • 2015-01-11
    • 1970-01-01
    相关资源
    最近更新 更多