MapReduce核心思想
- 1)分布式的运算程序往往需要分成至少2个阶段。
- 2)第一个阶段的MapTask并发实例,完全并行运行,互不相干。
- 3)第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出。
- 4)MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。
1. InputFormat
(切片| 把切片变成k,v值)数据输入
一. 默认的FileInputFormat--TextInputFormat
public abstract class FileInputFormat<K, V> extends InputFormat<K, V>
public class TextInputFormat extends FileInputFormat<LongWritable, Text>
Rich leaning form --每条记录对应的键值对---> (0,Rich leaning form )
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,TaskAttemptContext context) {
String delimiter = context.getConfiguration().get("textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter)
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
return new LineRecordReader(recordDelimiterBytes);
}
默认切片规则的实现类FileInputFormat;默认把切片变成key,value值的实现类为TextInputFormat(按行读取每条记录,键是存储该行在整个文件中的起始偏移量,LongWritable类型;值是这行内容 Text类
型),它返回的RecordReader类型为LineRecordReader。
MapTask的数量是由InputFormat来指定的,InputFormat生成多少个InputSpilt切片数就会有多少个task。
切片与MapTask并行度决定机制
MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度。
MapTask并行度决定机制
数据块:Block是HDFS物理上把数据分成一块一块。
数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。
yarn优化策略,本地启动任务启动MapTask,尽量不产生网络IO;按照一个个文件来切,判断是否大于128M;切片数量默认>=文件数量。
切片的原理
FileInputFormat切片源码解析
FileInputFormat切片大小参数设置
Job提交流程
源码
waitForCompletion() submit(); // 1建立连接 connect(); // 1)创建提交Job的代理 new Cluster(getConfiguration()); // (1)判断是本地yarn还是远程 initialize(jobTrackAddr, conf); // 2 提交job submitter.submitJobInternal(Job.this, cluster) // 1)创建给集群提交数据的Stag路径 Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); // 2)获取jobid ,并创建Job路径 JobID jobId = submitClient.getNewJobID(); // 3)拷贝jar包到集群 copyAndConfigureFiles(job, submitJobDir); rUploader.uploadFiles(job, jobSubmitDir); // 4)计算切片,生成切片规划文件 writeSplits(job, submitJobDir); maps = writeNewSplits(job, jobSubmitDir); input.getSplits(job); // 5)向Stag路径写XML配置文件 writeConf(conf, submitJobFile); conf.writeXml(out); // 6)提交Job,返回提交状态 status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());