【问题标题】:Reducer not picking mapper output file减速器没有选择映射器输出文件
【发布时间】:2016-09-18 21:42:35
【问题描述】:

我在一个文件夹中有 4 个文件,文件夹位置是我的输入路径参数。我需要单独查找每个文件的字数,并且应该写入与输入文件同名的文件。

我已经编写了映射器类,它可以将输出正确地提供给指定的文件。但是,这不是由减速器处理的。我做错的是 - 我在编写映射器的输出时没有使用“上下文”,所以空的被传递给减速器并产生空白输出。但是,映射器根据需要执行并将文件保存在具有预期文件名的正确位置。我希望 shuffle 和 sort & reducer 处理这些文件/那些要传递给 reducer 的文件。请纠正我。谢谢。

映射器

package com.oracle.hadoop.multiwordcount;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class MultiWordCountMapper extends
    Mapper<LongWritable, Text, Text, LongWritable> {

protected String filenamekey;
private RecordWriter<Text, LongWritable> writer;

protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {

    // Read the line
    String line = value.toString();

    // Split the line into words
    String[] words = line.split(" ");

    // Assign count(1) to each word
    for (String word : words) {
        writer.write(new Text(word), new LongWritable(1));
    }

}

protected void setup(Context context) throws IOException,
        InterruptedException {
    InputSplit split = context.getInputSplit();
    Path path = ((FileSplit) split).getPath();

    // extract parent folder and filename
    filenamekey = path.getParent().getName() + "/" + path.getName();

    // base output folder
    final Path baseOutputPath = FileOutputFormat.getOutputPath(context);
    // output file name
    final Path outputFilePath = new Path(baseOutputPath, filenamekey);

    // We need to override the getDefaultWorkFile path to stop the file
    // being created in the _temporary/taskid folder
    TextOutputFormat<Text, LongWritable> tof = new TextOutputFormat<Text, LongWritable>() {
        @Override
        public Path getDefaultWorkFile(TaskAttemptContext context,
                String extension) throws IOException {
            return outputFilePath;

        }

    };
    // create a record writer that will write to the desired output
    // subfolder
    writer = tof.getRecordWriter(context);

}

protected void cleanup(Context context) throws IOException,
        InterruptedException {
    writer.close(context);
};
}

减速器

package com.oracle.hadoop.multiwordcount;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class MultiWordCountReducer extends
        Reducer<Text, LongWritable, Text, LongWritable> {

/*
 * private MultipleOutputs multiouputs;
 * 
 * protected void setup(Context context) throws java.io.IOException
 * ,InterruptedException { multiouputs = new MultipleOutputs(context);
 * 
 * }
 */
@Override
protected void reduce(Text key, Iterable<LongWritable> values,
        Context context) throws java.io.IOException, InterruptedException {
    // Sum the List of values
    long sum = 0;
    for (LongWritable value : values) {
        sum = sum + value.get();
    }

    // Assign Sum to corresponding Word
    context.write(key, new LongWritable(sum));

}
/*
 * protected void cleanup(Context context) throws java.io.IOException
 * ,InterruptedException { multiouputs.close(); };
 */

}

驱动程序

package com.oracle.hadoop.multiwordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MultiWordCountJob implements Tool {
    private Configuration conf;
@Override
public Configuration getConf() {
    return conf;
}

@Override
public void setConf(Configuration conf) {
    this.conf = conf;
}

@Override
public int run(String[] args) throws Exception {
    @SuppressWarnings("deprecation")
    Job mwcj = new Job(getConf());

    // setting the job name
    mwcj.setJobName("Multiple file WordCount Job");

    // to call this as a jar
    mwcj.setJarByClass(this.getClass());

    // setting custom mapper class
    mwcj.setMapperClass(MultiWordCountMapper.class);

    // setting custom reducer class
    mwcj.setReducerClass(MultiWordCountReducer.class);

    // setting no of reducers
    // mwcj.setNumReduceTasks(0);

    // setting custom partitioner class
    // mwcj.setPartitionerClass(WordCountPartitioner.class);

    // setting mapper output key class: K2
    mwcj.setMapOutputKeyClass(Text.class);

    // setting mapper output value class: V2
    mwcj.setMapOutputValueClass(LongWritable.class);

    // setting reducer output key class: K3
    mwcj.setOutputKeyClass(Text.class);

    // setting reducer output value class: V3
    mwcj.setOutputValueClass(LongWritable.class);

    // setting the input format class ,i.e for K1, V1
    mwcj.setInputFormatClass(TextInputFormat.class);

    // setting the output format class
    LazyOutputFormat.setOutputFormatClass(mwcj, TextOutputFormat.class);
    // mwcj.setOutputFormatClass(TextOutputFormat.class);

    // setting the input file path
    FileInputFormat.addInputPath(mwcj, new Path(args[0]));

    // setting the output folder path
    FileOutputFormat.setOutputPath(mwcj, new Path(args[1]));

    Path outputpath = new Path(args[1]);
    // delete the output folder if exists
    outputpath.getFileSystem(conf).delete(outputpath, true);

    // to execute the job and return the status
    return mwcj.waitForCompletion(true) ? 0 : -1;

}

public static void main(String[] args) throws Exception {
    int status = ToolRunner.run(new Configuration(),
            new MultiWordCountJob(), args);

    System.out.println("My Status: " + status);
}

}

【问题讨论】:

    标签: hadoop mapreduce


    【解决方案1】:

    在你的驱动类中,你设置的reducer个数是0 -->

    // setting no of reducers
    
    mwcj.setNumReduceTasks(0);
    

    让它大于 0 到你想要的任何值。然后 reducer 就可以工作了。

    【讨论】:

    • 我已经试过了。我已将减速器设置为 4。但结果相同
    • 没错,但还不足以让这段代码正常工作。不错的收获,不过
    【解决方案2】:

    使用MultipleOutputs,而不是直接写入文件,然后像往常一样使用context.write()方法将键值对传递给reducer。

    当然,正如 siddhartha jain 所说,如果将 numReduceTasks 指定为 0,则不能有 reduce 阶段。在这种情况下,作业将在 map 阶段结束。

    引用多个输出:

    MultipleOutputs 类通过传递给 Mapper 和 Reducer 实现的 map() 和 reduce() 方法的 OutputCollector 简化了对作业默认输出以外的其他输出的写入。
    ...
    当在 Mapper 实现中使用命名输出时,写入名称输出的键/值不属于缩减阶段,只有写入作业 OutputCollector 的键/值才是缩减阶段的一部分。

    要单独处理每个输入文件,请参阅我在 related post 中的回答。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2010-10-23
      • 2014-09-20
      • 2017-02-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多