一、输入格式

  1、输入分片split

      一个分片对应一个map任务;

      一个分片包含一个表(整个文件)上的若干行,而一条记录(单行)对应一行;

      分片包含一个以字节为单位的长度 和 一组存储位置,分片不包含实际的数据;

      map处理时会用分片的大小来排序,优先处理最大的分片;

 

  hadoop中Java定义的分片为InputSplit抽象类:主要两个方法,涉及分片长度,分片起始位置

    public abstract class InputSplit{
         public abstract long getLength() throws IOException, InterruptedException;       
         public abstract String[] getLocations() throws IOException, InterruptedException;       
    }

  InputSplit不需要手动去处理它,它是由InputFormat生成;InputFormat负责产生输入分片并将它们分割成记录:

    public abstract class InputFormat<K, V> {
        public abstract List<InputSplit> getSplits( JobContext context) throws IOException, InterruptedException;
        public abstract RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
    }   

  InputFormat抽象类定义的两个方法:getSplits() 和 createRecordReader()

  运行作业的客户端会调用getSplits()来计算分片,然后将它们发送到jobtracker,jobtracker会使用其存储位置来调度map任务从而在tasktracker上来处理这个分片数据。在tasktracker上,map任务把输入分片传给InputFormat的getRecordReader()方法来获得这个分片的RecordReader。RecordReader就是一个集合迭代器,map任务用一个RecordReader来生成记录的键/值对,然后再传递给map函数。

  2、FileInputFormat类

    FileInputFormat类是所有指定数据源实现类的基类,它本身主要有两个功能:a. 指定输入文件位置;b. 输入文件生成分片的实现代码段,具体实现由子类完成;

    继承图:

    Hadoop MapReduce输入输出类型    

 

    设置输入文件位置:

      FileInputFormat.addInputPath(job, new Path("hdfs://fileClusters:9000/wordcount.txt"));

      或 FileInputFormat.setInputPaths(job, new Path("hdfs://fileClusters:9000/wordcount.txt"));      

      可添加文件过滤器, FileInputFormat 中静态方法:

        public static void setInputPathFilter(Job job, Class<? extends PathFilter> filter)

        文件添加时,默认就会有一个过滤器,过滤掉"." 和 "_"开头的文件,会过滤掉隐藏文件;自定义的过滤器也是在默认过滤的基础上过滤;

 

    切分的分片大小:

        一个split的大小计算:max( minimumSize, min( maximumSize, blockSize ));

        minimumSize默认为1,maximumSize默认为Long.MAX_VALUE;

        所以通常 blockSize 在 minimumSize和maximumSize之间,所以一般分片大小就是块大小。

 

    设置不切分文件:

        两种方法:

          a. 设置minimumSize的大小为Long.MAX_VALUE;

          b. 在实现FileInputFormat的子类时,重写isSplitable()方法返回为false;

     

    在mapper中获取文件分片信息:

        在mapper中可以获取当前处理的分片的信息,可通过context.getInputSplit()方法来获取一个split;当输入的格式源于FileInputFormat时,该方法返回的InputSplit可以被强制转换化一个FileSplit(继承自InputSplit),可调用如下信息:

           a. getPath()  Path/String  文件的路径

           b. getStart() long

             c. getLength() long

    

     自定义一个输入格式,把整个文件作为一条记录: 

// Example 7-2. An InputFormat for reading a whole file as a record
class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        return false;
    }

    @Override
    public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
        WholeFileRecordReader reader = new WholeFileRecordReader();
        reader.initialize(split, context);
        return reader;
    }
}

//主要是实现RecordReader类
class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
    private FileSplit fileSplit;
    private Configuration conf;
    private BytesWritable value = new BytesWritable();
    private boolean processed = false;

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        this.fileSplit = (FileSplit) split;
        this.conf = context.getConfiguration();
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!processed) {
            byte[] contents = new byte[(int) fileSplit.getLength()];
            Path file = fileSplit.getPath();
            FileSystem fs = file.getFileSystem(conf);
            FSDataInputStream in = null;
            try {
                in = fs.open(file);
                IOUtils.readFully(in, contents, 0, contents.length);
                value.set(contents, 0, contents.length);
            } finally {
                IOUtils.closeStream(in);
            }
            processed = true;
            return true;
        }
        return false;
    }

    @Override
    public NullWritable getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();
    }

    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException {
        return processed ? 1.0f : 0.0f;
    }

    @Override
    public void close() throws IOException {
        // do nothing }
    }
}
View Code

相关文章: