回顾:

  在上一篇https://www.cnblogs.com/superlsj/p/11857691.html详细介绍了InputFormat的原理和常见的实现类。总结来说,InputFormat是将文件切片----->再转化为<key--value>对转交给Mapper处理。

  所以我们看到在InputFormat类中只有两个方法,一个负责切片,一个返回能将切片信息转化为相应的键值对的对象:

public abstract class InputFormat<K, V> {
    public InputFormat() {
    }

    public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException;

    public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
}

  以KeyValueInputFormat为例:

@Stable
public class KeyValueTextInputFormat extends FileInputFormat<Text, Text> {
    public KeyValueTextInputFormat() {
    }

    protected boolean isSplitable(JobContext context, Path file) {
        CompressionCodec codec = (new CompressionCodecFactory(context.getConfiguration())).getCodec(file);
        return null == codec ? true : codec instanceof SplittableCompressionCodec;
    }

    public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
        context.setStatus(genericSplit.toString());
        return new KeyValueLineRecordReader(context.getConfiguration());
    }
}

  我们知道:当使用KeyValueInputFormat并设置分隔符后,Mapper以分隔符前的内容作为Key来接收,以分隔符后面的内容作为Value来接收。那么在数据提交到Mapper之前,数据就必须被格式化为满足Mapper接收的格式,这个工作就是由InputFormat来完成的,而InputFormat实际上并不能完成这项工作,而是创建一个RecordReader来完成这项转换工作。顺带一提:isSplitable方法返回文件是否可以切片,当返回false时,表示在格式化输入文件时,不对文件进行切片,直接进行文本数据至键值对的转化。

一、设计自己的InputFormat:

  现有的InputFormat肯定是无法满足现实中花里胡哨的需求,所以自定义InputFormat是一项不可避免的工作。下面以将三个小文件合并处理,以文件名作为Key,文件内容的byte数组作为value,转换成一个SquenceFile文件的案例来演示自定义InputFormat的流程。SuquenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式,相比于纯文本文件,格式更加紧凑。

  1、自定义InputFormat

public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {
    public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        return new WholeFileRecordReader();
    }

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }
}

  2、自定义RecordReader

public class WholeFileRecordReader extends RecordReader<Text, BytesWritable> {
    private boolean notRead = true;
    private Text key = new Text();
    private BytesWritable value = new BytesWritable();
    private FSDataInputStream inputStream;
    private FileSplit fs;
    /**
     * 初始化方法,框架会在读取切片数据之前调用此方法,因此,一些在RecordReader工作时需要使用的资源可以在此方法中进行初始化(这些资源必须是可以在inputSplit和taskAttemptContext中可以获取到的)。
* InputSplit inputSplit:当前RecordReader正在处理的切片
   * TaskAttemptContext taskAttemptContext:当前Job的上下文,可以通过此对象获取job的配置对象
*/
  public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {   
    fs = (FileSplit)inputSplit;
 IOException { 
    IOUtils.closeStream(inputStream);
  }
}

  3、测试,本案例中Mapper和Redu啥也不用干,所以不用写,用默认提供的就行,是需要写一个Driver。

public class WholeFileDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(WholeFileDriver.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);

        job.setInputFormatClass(WholeFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);//【注意】

        FileInputFormat.setInputPaths(job, new Path("d:\\input"));
        FileOutputFormat.setOutputPath(job, new Path("d:\\output"));

        boolean b = job.waitForCompletion(true);
        System.out.println(b ? 0:1);
    }
}

 

相关文章: