Hadoop| MapperReduce02 框架原理

MapReduce核心思想

  • 1)分布式的运算程序往往需要分成至少2个阶段。
  • 2)第一个阶段的MapTask并发实例,完全并行运行,互不相干。
  • 3)第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出。
  • 4)MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。

1. InputFormat

切片| 把切片变成k,v值)数据输入

一. 默认的FileInputFormat--TextInputFormat

public abstract class FileInputFormat<K, V> extends InputFormat<K, V> 
public class TextInputFormat extends FileInputFormat<LongWritable, Text> 

Rich leaning form  --每条记录对应的键值对---> (0,Rich leaning form )
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,TaskAttemptContext context) { String delimiter = context.getConfiguration().get("textinputformat.record.delimiter"); byte[] recordDelimiterBytes = null; if (null != delimiter) recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); return new LineRecordReader(recordDelimiterBytes); }

默认切片规则的实现类FileInputFormat;默认把切片变成key,value值的实现类为TextInputFormat(按行读取每条记录,键是存储该行在整个文件中的起始偏移量,LongWritable类型;值是这行内容 Text类

型),它返回的RecordReader类型为LineRecordReader。

MapTask的数量是由InputFormat来指定的InputFormat生成多少个InputSpilt切片数就会有多少个task

切片与MapTask并行度决定机制

MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度。

MapTask并行度决定机制

数据块:Block是HDFS物理上把数据分成一块一块。

数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。

Hadoop| MapperReduce02 框架原理

yarn优化策略,本地启动任务启动MapTask,尽量不产生网络IO;按照一个个文件来切,判断是否大于128M;切片数量默认>=文件数量。

切片的原理

FileInputFormat切片源码解析

Hadoop| MapperReduce02 框架原理

 Hadoop| MapperReduce02 框架原理

FileInputFormat切片大小参数设置

Hadoop| MapperReduce02 框架原理

Job提交流程

源码

Hadoop| MapperReduce02 框架原理
waitForCompletion()

submit();

// 1建立连接
    connect();    
        // 1)创建提交Job的代理
        new Cluster(getConfiguration());
            // (1)判断是本地yarn还是远程
            initialize(jobTrackAddr, conf); 

// 2 提交job
submitter.submitJobInternal(Job.this, cluster)
    // 1)创建给集群提交数据的Stag路径
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

    // 2)获取jobid ,并创建Job路径
    JobID jobId = submitClient.getNewJobID();

    // 3)拷贝jar包到集群
copyAndConfigureFiles(job, submitJobDir);    
    rUploader.uploadFiles(job, jobSubmitDir);

// 4)计算切片,生成切片规划文件
writeSplits(job, submitJobDir);
        maps = writeNewSplits(job, jobSubmitDir);
        input.getSplits(job);

// 5)向Stag路径写XML配置文件
writeConf(conf, submitJobFile);
    conf.writeXml(out);

// 6)提交Job,返回提交状态
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
View Code

相关文章: