将数据写入 DataNodes 的代码存在于 2 个文件中:
-
DFSOutputStream.java(包:org.apache.hadoop.hdfs)
客户端写入的数据被拆分成数据包(通常为 64k 大小)。当数据包准备好时,数据会被排入数据队列,由DataStreamer 提取。
-
DataStreamer(包:org.apache.hadoop.hdfs)
它拾取数据队列中的数据包并将它们发送到管道中的数据节点(通常在数据管道中有 3 个数据节点,因为复制因子为 3)。
它检索一个新的块 ID 并开始将数据流式传输到数据节点。当一个数据块被写入时,它会关闭当前数据块并获取一个新的数据块用于写入下一组数据包。
获取新块的代码如下:
// get new block from namenode.
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
if(LOG.isDebugEnabled()) {
LOG.debug("Allocating new block");
}
setPipeline(nextBlockOutputStream());
initDataStreaming();
}
当前块关闭的代码如下:
// Is this block full?
if (one.isLastPacketInBlock()) {
// wait for the close packet has been acked
synchronized (dataQueue) {
while (!shouldStop() && ackQueue.size() != 0) {
dataQueue.wait(1000);// wait for acks to arrive from datanodes
}
}
if (shouldStop()) {
continue;
}
endBlock();
}
在endBlock() 方法中,舞台再次设置为:
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
这意味着,创建了一个新管道,用于将下一组数据包写入新块。
编辑:如何检测块结束?
由于DataStreamer 不断将数据附加到块中,它会更新写入的字节数。
/**
* increase bytes of current block by len.
*
* @param len how many bytes to increase to current block
*/
void incBytesCurBlock(long len) {
this.bytesCurBlock += len;
}
它还会不断检查写入的字节数是否等于块大小:
// If packet is full, enqueue it for transmission
//
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
getStreamer().getBytesCurBlock() == blockSize) {
enqueueCurrentPacketFull();
}
在上面的语句中,以下条件检查是否达到块大小:
getStreamer().getBytesCurBlock() == blockSize)
如果遇到块边界,则调用endBlock() 方法:
/**
* if encountering a block boundary, send an empty packet to
* indicate the end of block and reset bytesCurBlock.
*
* @throws IOException
*/
protected void endBlock() throws IOException {
if (getStreamer().getBytesCurBlock() == blockSize) {
setCurrentPacketToEmpty();
enqueueCurrentPacket();
getStreamer().setBytesCurBlock(0);
lastFlushOffset = 0;
}
}
这将确保当前块被关闭并从Name Node获得一个新块用于写入数据。
块大小由hdfs-site.xml文件中的dfs.blocksize参数确定(在我的集群中设置为128 MB = 134217728):
<property>
<name>dfs.blocksize</name>
<value>134217728</value>
<description>The default block size for new files, in bytes.
You can use the following suffix (case insensitive): k(kilo),
m(mega), g(giga), t(tera), p(peta), e(exa) to specify the
size (such as 128k, 512m, 1g, etc.), Or provide complete size
in bytes (such as 134217728 for 128 MB).
</description>
</property>