InputFormat 概述

  • 问题引入:
    在运行MapReduce程序时,输入的文件格式包括:基于行的日志文件,二进制格式文件,数据库表等。那么,针对不同的数据类型,MapReduce是如何读取这些数据的呢?
  • 问题答案:
    显然,在数据传递给MapTask之前,需要对数据进行切片处理等工作。下图是一个完整的Map,Reduce数据处理流程。
    Hadoop InputFormat 数据处理机制
    流程中的第一个步骤,就是InputFormat.

InputFormat 实现详解

  • Hadoop数据处理的第一步由 Inputformat 实现类完成。Hadoop框架提供了一个 InputFormat 接口。数据处理相关的InputFormat类都需要实现此接口。
    Hadoop InputFormat 数据处理机制
    这个接口有2个方法,如下图所示。
    Hadoop InputFormat 数据处理机制
    此方法用于数据分片,返回一个数据分片的数组。
    Hadoop InputFormat 数据处理机制
    此方法返回一个 RecordReader ,用来将数据转换成键值对的形式。

InputFormat 实现类

Hadoop中,抽象类 InputFormat,实现了InputFormat接口。相关实现类的继承树如下图所示。
Hadoop InputFormat 数据处理机制

FileInputFormat

此类是一个抽象类,间接实现了 InputFormat 接口。
FileInputFormat 常见的接口实现类包括:TextInputFormat,KeyValueTextInputFormat,NLineInputFormat,CombineTextInputFormat 等

  • 切片机制
    1. 简单地按照文件的内容长度进行切片
    2. 切片的大小,默认等于Block块的大小 (128MB)
    3. 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
  • 案例分析
    Hadoop InputFormat 数据处理机制

TextInputFormat

  • 切片机制
    1. TextInputFormat 切片机制和 FileInputFormat 切片机制相同
  • 使用场景
    1. TextInputFormat 是默认的FileInputFormat 实现类。按行读取每条记录
    2. 键是储存该行在整个文件中的起始字节偏移量,LongWritable类型
    3. 值是这行的内容,不包括任何终止字符(换行符和回车符)
  • 案例分析
    以下是一个示例,比如,一个分片包含了如下4条文本的记录。
    Hadoop InputFormat 数据处理机制
    经过TextInputFormat处理后的键值对如下:
    Hadoop InputFormat 数据处理机制

CombineTextInputFormat

  • 应用场景
    框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。
    CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。
  • 虚拟存储切片最大值设置
    CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
    注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。
  • 切片机制
    生成切片过程包括:虚拟存储过程和切片过程二部分
    Hadoop InputFormat 数据处理机制
  1. 虚拟储存过程
    将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)
    例如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分成一个4M。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件
  2. 切片过程
    (a)判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。
    (b)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
    (c)测试举例:有4个小文件大小分别为1.7M、5.1M、3.4M以及6.8M这四个小文件,则虚拟存储之后形成6个文件块,大小分别为:
    1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M)
    最终会形成3个切片,大小分别为:
    (1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M
  • 实际案例
  1. 需求
    将输入的大量小文件合并成一个切片统一处理。
    (1)输入数据
    准备4个小文件
    (2)期望
    期望一个切片处理4个文件
  2. 实现过程
    (1)不做任何处理,运行1.6节的WordCount案例程序,观察切片个数为4。
    Hadoop InputFormat 数据处理机制
    (2)在WordcountDriver中增加如下代码,运行程序,并观察运行的切片个数为3。
    Hadoop InputFormat 数据处理机制
    Hadoop InputFormat 数据处理机制
    (3)在WordcountDriver中增加如下代码,运行程序,并观察运行的切片个数为1。
    Hadoop InputFormat 数据处理机制
    Hadoop InputFormat 数据处理机制

KeyValueTextInputFormat

  • 切片方法
    继承自FileInputFormat的切片方法
  • 应用场景
    每一行为一条记录,被分隔符分割为key,value。可以通过在驱动类中设置 conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR,"\t") 来指定分隔符。默认的分隔符是tab
  • 案例分析
    输入字符串,如下图所示
    Hadoop InputFormat 数据处理机制
    按 ——> 为分隔符分割处理的键值对数据如下图所示
    Hadoop InputFormat 数据处理机制

NLineInputFormat

  • 切片机制
    如果使用NLineInputFormat,代表每个map进程处理的InputSplit不再按照Block块去划分,而是按照NLineInputFormat指定的行数N来划分。即是 输入文件的总行数/N=切片数,如果不能整除,切片数=商+1
  • 案例分析
    Hadoop InputFormat 数据处理机制

相关文章: