【问题标题】:How to save Mapreduce's Reducer output without Key,Value pair?如何在没有键值对的情况下保存 Mapreduce 的 Reducer 输出?
【发布时间】:2019-08-15 08:13:12
【问题描述】:

我正在编写一个 Mapreduce 程序来处理 Dicom 图像。 这个 Mapreduce 程序的目的是处理 dicom 图像,从中提取元数据,索引到 solr,最后在 Reducer 阶段它应该将原始图像保存在 hdfs 中。 我想将相同的文件保存在 HDFS 中作为减速器输出

所以我已经实现了大部分功能,但是在减速器阶段将相同的文件存储在 hdfs 中时它不起作用。

我已经用 dicom 图像查看器测试了处理后的 Dicom 文件,它说文件已弯曲,而且处理后的 dicom 文件的大小略有增加。 Ex. 原始 Dicom 大小为 628Kb,当 reducer 将此文件保存在 hdfs 中时,其大小变为 630Kb。

我已经尝试了这些链接的解决方案,但没有一个给出预期的结果。

Hadoop mapReduce How to store only values in HDFS

Hadoop - How to Collect Text Output Without Values

这是将 Dicom 文件作为单个文件读取(不拆分)的代码。

public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable>{

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }

    @Override
    public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
        WholeFileRecordReader reader = new WholeFileRecordReader();
        reader.initialize(split, context);
        return reader;
    }       
}

自定义 RecordReader

public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable>{

    private FileSplit fileSplit;
    private Configuration conf;
    private BytesWritable value = new BytesWritable();
    private boolean processed = false;

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {     
        this.fileSplit = (FileSplit) split;
        this.conf = context.getConfiguration();     
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!processed) {
            byte[] contents = new byte[(int) fileSplit.getLength()];
            System.out.println("Inside nextKeyvalue");
            System.out.println(fileSplit.getLength());
            Path file = fileSplit.getPath();
            FileSystem fs = file.getFileSystem(conf);
            FSDataInputStream in = null;
            try {
                in = fs.open(file);
                IOUtils.readFully(in, contents, 0, contents.length);
                value.set(contents, 0, contents.length);
            } finally {
                IOUtils.closeStream(in);
            }
                processed = true;
                return true;
            }
            return false;
    }

    @Override
    public void close() throws IOException {

    }

    @Override
    public NullWritable getCurrentKey() throws IOException, InterruptedException 
    {
        return NullWritable.get();
    }

    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return processed ? 1.0f : 0.0f;
    }

}

映射器类 映射器类可以根据我们的需要完美运行。

public class MapClass{

    public static class Map extends Mapper<NullWritable, BytesWritable, Text, BytesWritable>{   

        @Override
        protected void map(NullWritable key, BytesWritable value,
                Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context)
                throws IOException, InterruptedException {
            value.setCapacity(value.getLength());
            InputStream in = new ByteArrayInputStream(value.getBytes());            
            ProcessDicom.metadata(in); // Process dicom image and extract metadata from it
            Text keyOut = getFileName(context);
            context.write(keyOut, value);

        }

        private Text getFileName(Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context)
        {
            InputSplit spl = context.getInputSplit();
            Path filePath = ((FileSplit)spl).getPath();
            String fileName = filePath.getName();
            Text text = new Text(fileName);
            return text;
        }

        @Override
        protected void setup(Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context)
                throws IOException, InterruptedException {
            super.setup(context);
        }

    }

Reducer 类 这是减速器类。 公共类 ReduceClass{

    public static class Reduce extends Reducer<Text, BytesWritable, BytesWritable, BytesWritable>{

        @Override
            protected void reduce(Text key, Iterable<BytesWritable> value,
                    Reducer<Text, BytesWritable, BytesWritable, BytesWritable>.Context context)
                    throws IOException, InterruptedException {

            Iterator<BytesWritable> itr = value.iterator();
            while(itr.hasNext())
            {
                BytesWritable wr = itr.next();
                wr.setCapacity(wr.getLength());
                context.write(new BytesWritable(key.copyBytes()), itr.next());
            }
        }
}

主类

public class DicomIndexer{

    public static void main(String[] argss) throws Exception{
        String args[] = {"file:///home/b3ds/storage/dd","hdfs://192.168.38.68:8020/output"};
        run(args);
    }

    public static void run(String[] args) throws Exception {

        //Initialize the Hadoop job and set the jar as well as the name of the Job
        Configuration conf = new Configuration();
        Job job = new Job(conf, "WordCount");
        job.setJarByClass(WordCount.class);
//      job.getConfiguration().set("mapreduce.output.basename", "hi");
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);
        job.setOutputKeyClass(BytesWritable.class);
        job.setOutputValueClass(BytesWritable.class);

        job.setMapperClass(Map.class);
        job.setCombinerClass(Reduce.class);
        job.setReducerClass(Reduce.class);
        job.setInputFormatClass(WholeFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        WholeFileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);

    }

}

所以我完全不知道该做什么。一些链接说这是不可能的,因为 Mapreduce 可以在 pair 上工作,有些说要使用 NullWritable。到目前为止,我已经尝试过 NullWritable、SequenceFileOutputFormat,但它们都不起作用。

【问题讨论】:

    标签: hadoop mapreduce hdfs


    【解决方案1】:

    两件事:

    1. 您无意中通过调用itr.next() 两次在reducer 中一次消耗了两个元素,这无济于事。

    2. 正如您所确定的,当您只想编写一个键和一个值时,您正在编写一个键和一个值。而是使用 NullWritable 作为值。您的减速器将如下所示:

      public static class Reduce extends Reducer<Text, BytesWritable, BytesWritable, NullWritable>{
          @Override
          protected void reduce(Text key, Iterable<BytesWritable> value,
                                Reducer<Text, BytesWritable, BytesWritable, NullWritable>.Context context)
                  throws IOException, InterruptedException {
              NullWritable nullWritable = NullWritable.get();
              Iterator<BytesWritable> itr = value.iterator();
              while(itr.hasNext())
              {
                  BytesWritable wr = itr.next();
                  wr.setCapacity(wr.getLength());
                  context.write(wr, nullWritable);
              }
          }
      }
      

    【讨论】:

    • 另外,你需要打电话给setCapacity()吗?我会尝试不使用它。
    • 嘿 Ben 感谢您的帮助,我忘记评论 itr.next(),我也尝试过,但没有成功。无论如何,我找到了解决方案。我已经创建了一个自定义 RecordWriter 和 Custom fileoutput 格式并且它可以工作,但我仍然不知道它是否是正确的方法。我会尽快发布答案,请在您有空的时候查看它。
    猜你喜欢
    • 2013-10-04
    • 2012-05-03
    • 2016-11-22
    • 2019-09-01
    • 2014-05-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多