转载自: http://www.cnblogs.com/lxf20061900/p/3810977.html
InputFormat主要用于描述输入数据的格式(我们只分析新API,即org.apache.hadoop.mapreduce.lib.input.InputFormat),提供以下两个功能:
(1)数据切分:按照某个策略将输入数据切分成若干个split,以便确定MapTask个数以及对应的split;
(2)为Mapper提供输入数据:读取给定的split的数据,解析成一个个的key/value对,供mapper使用。
InputFormat有两个比较重要的方法:(1)List<InputSplit> getSplits(JobContext job);(2)RecordReader<LongWritable, Text> createRecordReader(InputSplit split,TaskAttemptContext context)。这两个方法分别对应上面的两个功能。
InputSplit分片信息有两个特点:(1)是逻辑分片,只是在逻辑上对数据进行分片,并不进行物理切分,这点和block是不同的,只记录一些元信息,比如起始位置、长度以及所在的节点列表等;(2)必须可序列化,分片信息要上传到HDFS文件,还会被JobTracker读取,序列化可以方便进程通信以及永久存储。
RecordReader对象可以将输入数据,即InputSplit对应的数据解析成众多的key/value,会作为MapTask的map方法的输入。
我们本节就以最长使用的TextInputFormat为列来讲解分片和读取分片数据。
先看继承关系:(1)public class TextInputFormat extends FileInputFormat;(2)public abstract class FileInputFormat<K, V> extends InputFormat;(3)public abstract class InputFormat。最顶的父类InputFormat只有两个未实现的抽象方法getSplits和createRecordReader;而FileInputFormat包含的方法比较多,如下图:
,我们在自己的MR程序中设置输入目录就是调用这里的方法;TextInputFormat这个类只有俩个方法,代码如下:
1 /** An {@link InputFormat} for plain text files. Files are broken into lines.
2 * Either linefeed or carriage-return are used to signal end of line. Keys are
3 * the position in the file, and values are the line of text.. */
4 public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
5
6 @Override
7 public RecordReader<LongWritable, Text>
8 createRecordReader(InputSplit split,
9 TaskAttemptContext context) {
10 return new LineRecordReader();
11 }
12
13 @Override
14 protected boolean isSplitable(JobContext context, Path file) {
15 CompressionCodec codec =
16 new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
17 return codec == null;
18 }
19
20 }
isSplitable方法就是是否要切分文件,这个方法显示如果是压缩文件就不切分,非压缩文件就切分。
接下来,我们只关注上面说的那两个主要方法,首先来看:
一、getSplits方法,这个方法在FileInputFormat类中,它的子类一般只需要实现TextInputFormat中的两个方法而已,getSplits方法代码如下:
1 /** 2 * Generate the list of files and make them into FileSplits. 3 */ 4 public List<InputSplit> getSplits(JobContext job 5 ) throws IOException { 6 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); 7 long maxSize = getMaxSplitSize(job); //Long.MAX_VALUE 8 9 // generate splits 10 List<InputSplit> splits = new ArrayList<InputSplit>(); 11 List<FileStatus>files = listStatus(job); 12 for (FileStatus file: files) { 13 Path path = file.getPath(); 14 FileSystem fs = path.getFileSystem(job.getConfiguration()); 15 long length = file.getLen(); //整个文件的长度 16 BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); 17 if ((length != 0) && isSplitable(job, path)) { //默认是true,但是如果是压缩的,则是false 18 long blockSize = file.getBlockSize(); //64M,67108864B 19 long splitSize = computeSplitSize(blockSize, minSize, maxSize); //计算split大小 Math.max(minSize, Math.min(maxSize, blockSize)) 20 21 long bytesRemaining = length; 22 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 23 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); 24 splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 25 blkLocations[blkIndex].getHosts())); //hosts是主机名,name是IP 26 bytesRemaining -= splitSize; //剩余块的大小 27 } 28 29 if (bytesRemaining != 0) { //最后一个 30 splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 31 blkLocations[blkLocations.length-1].getHosts())); 32 } 33 } else if (length != 0) { //isSplitable(job, path)等于false 34 splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts())); 35 } else { 36 //Create empty hosts array for zero length files 37 splits.add(new FileSplit(path, 0, length, new String[0])); 38 } 39 } 40 41 // Save the number of input files in the job-conf 42 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); 43 44 LOG.debug("Total # of splits: " + splits.size()); 45 return splits; 46 }