【问题标题】:Value of output of First Reduce Task as <Key Value> pair in mapreduce第一个 Reduce 任务的输出值作为 mapreduce 中的 <Key Value> 对
【发布时间】:2018-05-28 16:51:43
【问题描述】:

我希望我的第一个 reduce 任务产生类似 (course, ); 在第二个减少任务中,我将计算每门课程的总和/计数。 第一个 reducer 任务充当组合器、求和和计数;第二个减少任务找到每个课程的平均值并输出平均值。我只是找不到将输出值存储为密钥对然后能够检索并对其进行计算的最佳类型。 HashMap 不起作用。

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class AvgGrading {

    public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "avg grading");
        job.setJarByClass(AvgGrading.class);
        job.setMapperClass(MapForAverage.class);
        job.setCombinerClass(ReduceForAverage.class);
        job.setNumReduceTasks(2);
        job.setReducerClass(ReduceForFinal.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Object.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(FloatWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static class MapForAverage extends Mapper<LongWritable, Text, LongWritable, Object> {

        public void map(LongWritable key, Text value, Context con) throws IOException, InterruptedException {
            String [] word = value.toString().split(", ");
            float grade = Integer.parseInt(word[1]);
            int course = Integer.parseInt(word[0]);
            Map <Float,Long> m = new HashMap<Float,Long>();
            m.put(grade, (long) 1);
            con.write(new LongWritable(course), m);
        }
    }

    public static class ReduceForAverage extends Reducer<LongWritable, Object, LongWritable, Object> {
        private FloatWritable result = new FloatWritable();
        public void reduce(LongWritable course, Map<Float,Long> values, Context con)
                throws IOException, InterruptedException {

            Map <Float,Long> m = new HashMap<Float,Long>();

            float sum = 0;
            long count =0;
            for (Map.Entry<Float, Long> entry : values.entrySet()) {
                sum += entry.getKey();
                count++;
            }
            m.put(sum, count);

            con.write(course, m);
        }
    }

    public static class ReduceForFinal extends Reducer<LongWritable, Object, LongWritable, FloatWritable> {
        private FloatWritable result = new FloatWritable();
        public void reduce(LongWritable course, Map<Long,Float>values, Context con)
                throws IOException, InterruptedException {

            long key = 0;
            float value=0;

            for ( Map.Entry<Long, Float> entry : values.entrySet()) {
                 key = entry.getKey();
                 value = entry.getValue();
            }
            float res= key/value;

            con.write(course, new FloatWritable(res));
        }
    }
}

请注意,我无法在 Reduce 任务中遍历 Iterable &lt; Map&lt;Float,Int&gt;&gt;,因此我传递的是可能不正确的简单 Map。

错误码是:

Unable to initialize MapOutputCollector org.apache.hadoop.mapred.MapTask$MapOutputBuffer

java.lang.NullPointerException

第二个减速器失败

【问题讨论】:

    标签: java hadoop mapreduce


    【解决方案1】:

    Map 没有实现 Writable,你说你的组合器和归约器输入值的类是 Object,而你正在发射 Map。你只需要为此目的创建一个自定义类。请记住,如果您想在 hadoop 中发出某些内容,则自定义类必须实现 Writable。这是你可以做的:

    public class Counter implements Writable {
    
           private float sum;
           private long count;
    
           public Counter(float sum, long count){
                  this.sum = sum;
                  this.count = count;
           }
    
           /* Methods to get and set private variables of the class */
    
           public float getSum() {
                  return sum;
           }
    
           public void setSum(float sumValue) {
                  sum=sumValue;
           }
    
           public long getCount() {
                  return count;
           }
    
           public void setCount(long countValue) {
                  count=countValue;
           }
    
           /* Methods to serialize and deserialize the contents of the
              instances of this class */
    
           @Override /* Serialize the fields of this object to out */ 
           public void write(DataOutput out) throws IOException{
                  out.writeFloat(sum);
                  out.writeLong(count);
           }
    
          @Override /* Deserialize the fields of this object from in */
          public void readFields(DataInputin) throws IOException{
                      sum=in.readFloat();
                      count=in.readLong();
           }
           }
    

    所以在您的第一个映射器中,您可以通过这种方式创建并发出一个计数器:

           Counter counter = new Counter(grade, 1);
           con.write(course, counter);
    

    此时,在您的第一个 reducer 中,您将拥有一个表示课程的键和一个可迭代的所有计数器的可迭代值,并且您可以使用此可迭代计算平均值。记得更新 mapper 和 reducers 类参数以与新参数保持一致。

    【讨论】:

    • 是的,我认为 Map 本身是一个对象,因此它能够以某种方式绕过。我会检查你的解决方案,谢谢你的时间
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-08-31
    • 1970-01-01
    • 2012-06-16
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多