【问题标题】:Hadoop: Redcuer doesn't emit correct claculationHadoop:Reducer 不会发出正确的计算
【发布时间】:2021-06-01 03:23:32
【问题描述】:

我有以下应该计算 score = POS /(-1*sum(NEGs)) 的 Reducer 类(MapReduce 作业的一部分)。

其中POS 是一个正数,NEGs 是两个负数。总是这样。

例如,如果映射器的输入是:

<A, A>  -15.0
<A, A>  2.0
<A, A>  -15.0

预期的输出是:

<A, A>  0.06666666666666667

但是,它为每个输出记录输出infinity

<A, A>  Infinity

在调试时,如果我在 while 循环中添加了发出值的语句:

score.set(val);
context.write(key, score);

,它可以很好地打印结果,但会重复除法。所以我得到以下信息:

<A, A>  -15.0
<A, A>  2.0
<A, A>  -15.0
<A, A>  0.06666666666666667   # correct calculation (2/30)
<A, A>  0.0022222222222222222 # Not sure why it divids twice by 30 (2/30/30)!!

这是MyReducer

private static class MyReducer extends
        Reducer<Pair, DoubleWritable, Pair, DoubleWritable> {
    private DoubleWritable score = new DoubleWritable();
    int counter = 0;

    @Override
    public void reduce(Pair key, Iterable<DoubleWritable> values, Context context)
            throws IOException, InterruptedException {
        Iterator<DoubleWritable> iter = values.iterator();
        double nor = 0.0;
        double don = 0.0;

        double val;
        while (iter.hasNext()) {
            val = iter.next().get();
            if (val < 0)
                don += val*-1;
            else
                nor = val;
            //uncomment for debugging!
            //score.set(val);
            //context.write(key, score);
        }

        score.set(nor / don);
        context.write(key, score);
    }

谁能解释一下原因

  • 如果我在 while 循环中没有发出任何东西,则发出无穷大
  • 除以分母两次?

谢谢!

【问题讨论】:

    标签: hadoop mapreduce reducers


    【解决方案1】:

    当然,在 Java 中表现滑稽的双精度数远非罕见,但在这种特殊情况下,双精度数的奇怪方式并不奇怪,至于它们在 Hadoop 术语中的兼容性。

    首先,这种类型的 reduce 计算至关重要,只能在作业的 Reduce 阶段使用,而不能Combine 阶段使用> 阶段(如果有)。如果您已将此 reduce 计算设置为也作为组合器实现,您可以考虑取消设置此设置。这不是一个经验法则,但是 MapReduce 作业中存在很多错误,人们无法完全弄清楚为什么 reducer 会得到奇怪的数据或连续执行两次计算(就像你指出的那样出)。

    但是,问题的可能罪魁祸首是,为了获得安全的双类型除法,您确实需要使用 type casting 才能获得正确的双类型结果。

    为了展示这一点,我使用了一个基于您的输入数据并存储在\input 目录中的输入示例。每个唯一键都有一个正数和两个负数作为值(为了简单起见,这里将键设置为String),如下所示:

    Α -15.0
    Α 2.0
    Α -15.0
    Β -10.0
    Β 9.0
    Β -12.0
    C -7.0
    C 1.0
    C -19.0
    D -5.0
    D 18.0
    D -5.0
    E -6.0
    E 6.0
    E -6.0
    

    然后使用显式类型转换来计算每个分数,如下代码所示:

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    import java.io.*;
    import java.io.IOException;
    import java.util.*;
    import java.nio.charset.StandardCharsets;
    
    
    public class ScoreComp 
    {
        /* input:  <Character, Number>
         * output: <Character, Number>
         */
        public static class Map extends Mapper<Object, Text, Text, DoubleWritable> 
        {
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
            {
                String record = value.toString();
                String[] parts = record.split(" "); // just split the lines into key and value
    
                // create key-value pairs from each line
                context.write(new Text(parts[0]), new DoubleWritable(Double.parseDouble(parts[1])));
            }
        }
    
        /* input:  <Character, Number>
         * output: <Character, Score>
         */
        public static class Reduce extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
        {
            public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException 
            {
                double pos = 0.0;
                double neg = 0.0;
    
                // for every value of a unique key...
                for(DoubleWritable value : values)
                {
                    // retrieve the positive number and calculate the sum of the two negative numbers
                    if(value.get() < 0)
                        neg += value.get();
                    else
                        pos = value.get();
                }
    
                // calculate the score based on the values of each key (using explicit type casting)
                double result = (double) pos / (-1 * neg);
    
                // create key-value pairs for each key with its score
                context.write(key, new DoubleWritable(result));
            }
        }
    
    
        public static void main(String[] args) throws Exception
        {
            // set the paths of the input and output directories in the HDFS
            Path input_dir = new Path("input");
            Path output_dir = new Path("scores");
    
            // in case the output directory already exists, delete it
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(conf);
            if(fs.exists(output_dir))
                fs.delete(output_dir, true);
    
            // configure the MapReduce job
            Job scorecomp_job = Job.getInstance(conf, "Score Computation");
            scorecomp_job.setJarByClass(ScoreComp.class);
            scorecomp_job.setMapperClass(Map.class);
            scorecomp_job.setReducerClass(Reduce.class);    
            scorecomp_job.setMapOutputKeyClass(Text.class);
            scorecomp_job.setMapOutputValueClass(DoubleWritable.class);
            scorecomp_job.setOutputKeyClass(Text.class);
            scorecomp_job.setOutputValueClass(DoubleWritable.class);
            FileInputFormat.addInputPath(scorecomp_job, input_dir);
            FileOutputFormat.setOutputPath(scorecomp_job, output_dir);
            scorecomp_job.waitForCompletion(true);
        }
    }
    

    您可以看到 /scores 目录中的 MapReduce 作业的结果在数学方面是有意义的(通过 HDFS 浏览器截取的屏幕截图):

    【讨论】:

      猜你喜欢
      • 2022-01-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多