转载自: http://www.cnblogs.com/lxf20061900/p/3810977.html

 

InputFormat主要用于描述输入数据的格式(我们只分析新API,即org.apache.hadoop.mapreduce.lib.input.InputFormat),提供以下两个功能:

  (1)数据切分:按照某个策略将输入数据切分成若干个split,以便确定MapTask个数以及对应的split;

  (2)为Mapper提供输入数据:读取给定的split的数据,解析成一个个的key/value对,供mapper使用。

  InputFormat有两个比较重要的方法:(1)List<InputSplit> getSplits(JobContext job);(2)RecordReader<LongWritable, Text> createRecordReader(InputSplit split,TaskAttemptContext context)。这两个方法分别对应上面的两个功能。

  InputSplit分片信息有两个特点:(1)是逻辑分片,只是在逻辑上对数据进行分片,并不进行物理切分,这点和block是不同的,只记录一些元信息,比如起始位置、长度以及所在的节点列表等;(2)必须可序列化,分片信息要上传到HDFS文件,还会被JobTracker读取,序列化可以方便进程通信以及永久存储。

  RecordReader对象可以将输入数据,即InputSplit对应的数据解析成众多的key/value,会作为MapTask的map方法的输入。

  我们本节就以最长使用的TextInputFormat为列来讲解分片和读取分片数据。

  先看继承关系:(1)public class TextInputFormat extends FileInputFormat;(2)public abstract class FileInputFormat<K, V> extends InputFormat;(3)public abstract class InputFormat。最顶的父类InputFormat只有两个未实现的抽象方法getSplits和createRecordReader;而FileInputFormat包含的方法比较多,如下图:

MapReduce中TextInputFormat分片和读取分片数据源码级分析,我们在自己的MR程序中设置输入目录就是调用这里的方法;TextInputFormat这个类只有俩个方法,代码如下:

MapReduce中TextInputFormat分片和读取分片数据源码级分析
 1 /** An {@link InputFormat} for plain text files.  Files are broken into lines.
 2  * Either linefeed or carriage-return are used to signal end of line.  Keys are
 3  * the position in the file, and values are the line of text.. */
 4 public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
 5 
 6   @Override
 7   public RecordReader<LongWritable, Text> 
 8     createRecordReader(InputSplit split,
 9                        TaskAttemptContext context) {
10     return new LineRecordReader();
11   }
12 
13   @Override
14   protected boolean isSplitable(JobContext context, Path file) {
15     CompressionCodec codec = 
16       new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
17     return codec == null;
18   }
19 
20 }
MapReduce中TextInputFormat分片和读取分片数据源码级分析

  isSplitable方法就是是否要切分文件,这个方法显示如果是压缩文件就不切分,非压缩文件就切分。

  接下来,我们只关注上面说的那两个主要方法,首先来看:

  一、getSplits方法,这个方法在FileInputFormat类中,它的子类一般只需要实现TextInputFormat中的两个方法而已,getSplits方法代码如下: 

 1 /** 
 2    * Generate the list of files and make them into FileSplits.
 3    */ 
 4   public List<InputSplit> getSplits(JobContext job
 5                                     ) throws IOException {
 6     long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
 7     long maxSize = getMaxSplitSize(job);    //Long.MAX_VALUE
 8 
 9     // generate splits
10     List<InputSplit> splits = new ArrayList<InputSplit>();
11     List<FileStatus>files = listStatus(job);
12     for (FileStatus file: files) {
13       Path path = file.getPath();
14       FileSystem fs = path.getFileSystem(job.getConfiguration());
15       long length = file.getLen();    //整个文件的长度
16       BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
17       if ((length != 0) && isSplitable(job, path)) {    //默认是true,但是如果是压缩的,则是false
18         long blockSize = file.getBlockSize();        //64M,67108864B
19         long splitSize = computeSplitSize(blockSize, minSize, maxSize);    //计算split大小  Math.max(minSize, Math.min(maxSize, blockSize))
20 
21         long bytesRemaining = length;            
22         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
23           int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
24           splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
25                                    blkLocations[blkIndex].getHosts()));        //hosts是主机名,name是IP
26           bytesRemaining -= splitSize;        //剩余块的大小
27         }
28         
29         if (bytesRemaining != 0) {    //最后一个
30           splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 
31                      blkLocations[blkLocations.length-1].getHosts()));
32         }
33       } else if (length != 0) {    //isSplitable(job, path)等于false
34         splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
35       } else {
36         //Create empty hosts array for zero length files
37         splits.add(new FileSplit(path, 0, length, new String[0]));
38       }
39     }
40     
41     // Save the number of input files in the job-conf
42     job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
43 
44     LOG.debug("Total # of splits: " + splits.size());
45     return splits;
46   }
View Code

相关文章: