【问题标题】:Which Java file does Hadoop HDFS file splitting into BlocksHadoop HDFS 文件将哪个 Java 文件拆分为块
【发布时间】:2016-01-12 21:45:48
【问题描述】:

众所周知,当将本地文本文件复制到 HDFS 时,该文件将被拆分为固定大小的 128 MB。例如,当我将 256 MB 的文本文件复制到 HDFS 时,将有 2 个块 (256/128) 包含“拆分”文件。

谁能告诉我 Hadoop 2.7.1 源代码中的哪个 java/jar 文件具有将文件分成块的功能以及哪个 java/jar 文件将块写入数据节点的目录.

帮我追踪这段代码。

我只找到了他们对 FileInputFormat.java 中的块进行逻辑输入拆分的那个,这不是我需要的。我需要用于分割物理文件的 java 文件。

【问题讨论】:

    标签: java apache hadoop hdfs


    【解决方案1】:

    将数据写入 DataNodes 的代码存在于 2 个文件中:

    • DFSOutputStream.java(包:org.apache.hadoop.hdfs

      客户端写入的数据被拆分成数据包(通常为 64k 大小)。当数据包准备好时,数据会被排入数据队列,由DataStreamer 提取。

    • DataStreamer(包:org.apache.hadoop.hdfs

      它拾取数据队列中的数据包并将它们发送到管道中的数据节点(通常在数据管道中有 3 个数据节点,因为复制因子为 3)。

      它检索一个新的块 ID 并开始将数据流式传输到数据节点。当一个数据块被写入时,它会关闭当前数据块并获取一个新的数据块用于写入下一组数据包。

      获取新块的代码如下:

      // get new block from namenode.
      if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
        if(LOG.isDebugEnabled()) {
          LOG.debug("Allocating new block");
        }
        setPipeline(nextBlockOutputStream());
        initDataStreaming();
      }
      

      当前块关闭的代码如下:

      // Is this block full?
      if (one.isLastPacketInBlock()) {
        // wait for the close packet has been acked
        synchronized (dataQueue) {
          while (!shouldStop() && ackQueue.size() != 0) {
            dataQueue.wait(1000);// wait for acks to arrive from datanodes
          }
        }
        if (shouldStop()) {
          continue;
        }
      
        endBlock();
      }
      

      endBlock() 方法中,舞台再次设置为:

      stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
      

      这意味着,创建了一个新管道,用于将下一组数据包写入新块。

    编辑:如何检测块结束?

    由于DataStreamer 不断将数据附加到块中,它会更新写入的字节数。

    /**
      * increase bytes of current block by len.
      *
      * @param len how many bytes to increase to current block
      */
    void incBytesCurBlock(long len) {
        this.bytesCurBlock += len;
    }
    

    它还会不断检查写入的字节数是否等于块大小:

    // If packet is full, enqueue it for transmission
    //
    if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
        getStreamer().getBytesCurBlock() == blockSize) {
      enqueueCurrentPacketFull();
    }
    

    在上面的语句中,以下条件检查是否达到块大小:

    getStreamer().getBytesCurBlock() == blockSize)
    

    如果遇到块边界,则调用endBlock() 方法:

    /**
     * if encountering a block boundary, send an empty packet to
     * indicate the end of block and reset bytesCurBlock.
     *
     * @throws IOException
     */
    protected void endBlock() throws IOException {
        if (getStreamer().getBytesCurBlock() == blockSize) {
          setCurrentPacketToEmpty();
          enqueueCurrentPacket();
          getStreamer().setBytesCurBlock(0);
          lastFlushOffset = 0;
        }
    }
    

    这将确保当前块被关闭并从Name Node获得一个新块用于写入数据。

    块大小由hdfs-site.xml文件中的dfs.blocksize参数确定(在我的集群中设置为128 MB = 134217728):

    <property>
        <name>dfs.blocksize</name>
        <value>134217728</value>
        <description>The default block size for new files, in bytes.
            You can use the following suffix (case insensitive): k(kilo),
            m(mega), g(giga), t(tera), p(peta), e(exa) to specify the
            size (such as 128k, 512m, 1g, etc.), Or provide complete size
            in bytes (such as 134217728 for 128 MB).
        </description>
    </property>
    

    【讨论】:

    • 真的很棒的答案,但是这个 if 语句 if (one.isLastPacketInBlock()) { } 如何获取最大块大小的信息?代码的哪一部分表示要拆分为 128 MB 的文件?
    • 完美答案!只是为了确认一下,调用 enqueueCurrentPacketFull(); 的 if 语句是在 DFSOutputStream.java 中吧?
    • @Idris,是的,它位于DFSOutputStream.javaDFSOutputStreamDataStreamer 共同努力实现这一目标。一个创建数据包,另一个流式传输数据包。两者一起工作以跟踪块边界并根据需要获取新块。
    【解决方案2】:

    它不是一个 jar/java 文件,它具有拆分文件的功能。执行此任务的是客户端守护进程。当您从本地加载文件时,客户端首先仅读取 128MB,它通过询问 namenode 找到存储文件的位置,并确保文件被正确复制和复制。客户端在这个阶段不知道文件的实际大小,除非它以相同的方式读取所有块。

    当您要存储文件时,hdfs 不使用您提到的 FileInputFormat.java。当您想在该文件上运行任何 mapreduce 任务时使用它。它与文件的存储无关。

    【讨论】:

    • 感谢您的回答!但是当然不应该在“客户端守护程序任务”(您声称的)中有一个代码,该代码至少有一个 if 语句,可以连续将文件中的数据读取到一个块中,直到文件达到最大大小(128 MB) .
    猜你喜欢
    • 2021-10-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-07-13
    • 2021-03-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多