版本Apache Hadoop 2.7.2

运行环境:Windows10

Eclipse Debug: https://blog.csdn.net/qq_40794973/article/details/87876772

自定义InputFormat: https://blog.csdn.net/qq_40794973/article/details/87863896#t29


打断点:

WholeRecordReader类的: initialize(InputSplit split, TaskAttemptContext context) 方法

MapReduce_自定义InputFormat_debug

MapReduce_自定义InputFormat_debug

MapReduce_自定义InputFormat_debug

	/**
	 * 核心业务逻辑
	 * key = 文件路径+名称
	 * value = 小文件封装到BytesWritable里面
	 */
	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {		
		if (isProgress) {
			// 1 定义缓存区
			byte[] contents = new byte[(int)split.getLength()];
			FileSystem fs = null;
			FSDataInputStream fis = null;
			try {
				// 2 获取文件系统
				Path path = split.getPath();
				fs = path.getFileSystem(configuration);//通过路径 (反推) 拿到文件系统
				// 3 读取数据
				fis = fs.open(path);
				// 4 读取文件内容
				IOUtils.readFully(fis, contents, 0, contents.length);//先把输入流读入缓冲区里面
				// 5 输出文件内容
				value.set(contents, 0, contents.length);//写入 BytesWritable 里面
				// 6 获取文件路径及名称
				String name = split.getPath().toString();
				// 7 设置输出的key值
				k.set(name);
			} catch (Exception e) {
				e.printStackTrace();
			}finally {
				IOUtils.closeStream(fis);//fs 先不关 不然会报错
			}
			isProgress = false;//读完后置为 false
			return true;
		}
		return false;
	}

首先到的是 nextKeyValue() 方法,该方法是在 map() 方法之前调用

nextKeyValue() 方法就是  Mapper 里面的

  /**
   * Expert users can override this method for more complete control over the
   * execution of the Mapper.
   * @param context
   * @throws IOException
   */
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }
  }

 

MapReduce_自定义InputFormat_debug

 

MapReduce_自定义InputFormat_debug

 

MapReduce_自定义InputFormat_debug

 

一直向下执行到方法结束,跳到了 MapTask 方法里面的 nextKeyValue()

MapReduce_自定义InputFormat_debug

 

MapReduce_自定义InputFormat_debug

据需向下执行,进入 MapContextImpl类  里面的 nextKeyValue() 方法

  @Override
  public boolean nextKeyValue() throws IOException, InterruptedException {
    return reader.nextKeyValue();
  }

继续 进入 WrappedMapper 类

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
      return mapContext.nextKeyValue();
    }

继续

  /**
   * Expert users can override this method for more complete control over the
   * execution of the Mapper.
   * @param context
   * @throws IOException
   */
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
->      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }
  }

MapReduce_自定义InputFormat_debug

 

MapReduce_自定义InputFormat_debug

 后面还会继续执行知道返回 false,这个 map 方法才结束

MapReduce_自定义InputFormat_debug

 

MapReduce_自定义InputFormat_debug

 

 

map 执行完成后,才会执行 reduce 方法,全速运行

MapReduce_自定义InputFormat_debug

 

 

 

 

相关文章:

  • 2021-12-06
  • 2021-06-26
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2022-01-05
  • 2021-09-17
猜你喜欢
  • 2021-08-29
  • 2022-01-27
  • 2021-08-02
  • 2022-12-23
  • 2021-10-30
  • 2021-10-16
相关资源
相似解决方案