运行环境:Windows10
Eclipse Debug: https://blog.csdn.net/qq_40794973/article/details/87876772
自定义InputFormat: https://blog.csdn.net/qq_40794973/article/details/87863896#t29
打断点:
WholeRecordReader类的: initialize(InputSplit split, TaskAttemptContext context) 方法
/**
* 核心业务逻辑
* key = 文件路径+名称
* value = 小文件封装到BytesWritable里面
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (isProgress) {
// 1 定义缓存区
byte[] contents = new byte[(int)split.getLength()];
FileSystem fs = null;
FSDataInputStream fis = null;
try {
// 2 获取文件系统
Path path = split.getPath();
fs = path.getFileSystem(configuration);//通过路径 (反推) 拿到文件系统
// 3 读取数据
fis = fs.open(path);
// 4 读取文件内容
IOUtils.readFully(fis, contents, 0, contents.length);//先把输入流读入缓冲区里面
// 5 输出文件内容
value.set(contents, 0, contents.length);//写入 BytesWritable 里面
// 6 获取文件路径及名称
String name = split.getPath().toString();
// 7 设置输出的key值
k.set(name);
} catch (Exception e) {
e.printStackTrace();
}finally {
IOUtils.closeStream(fis);//fs 先不关 不然会报错
}
isProgress = false;//读完后置为 false
return true;
}
return false;
}
首先到的是 nextKeyValue() 方法,该方法是在 map() 方法之前调用
nextKeyValue() 方法就是 Mapper 里面的
/**
* Expert users can override this method for more complete control over the
* execution of the Mapper.
* @param context
* @throws IOException
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
一直向下执行到方法结束,跳到了 MapTask 方法里面的 nextKeyValue()
据需向下执行,进入 MapContextImpl类 里面的 nextKeyValue() 方法
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return reader.nextKeyValue();
}
继续 进入 WrappedMapper 类
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return mapContext.nextKeyValue();
}
继续
/**
* Expert users can override this method for more complete control over the
* execution of the Mapper.
* @param context
* @throws IOException
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
-> while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
后面还会继续执行知道返回 false,这个 map 方法才结束
map 执行完成后,才会执行 reduce 方法,全速运行