今天正式开始hadoop源码学习,此前chang'shi尝试过hadoop3源码阅读,无奈发展到3,源码已过于复杂,难以看懂,于是尝试haddop0.1.1.2源码学习,发现虽然时隔jiu'y久远,设计思想上还是一脉相承。今天来看DataNode类
DataNode 类实现了 FSConstants(FS常数)和Runnable接口,FSConstants来自于同一包下。FSConstants定义了许多常数,第一个是MIN_BLOCKS_FOR_WRITE(最小的可写块 ), 还有诸多基于不同客户分别和NameNode和DataNode之间交互的信号量以及一些常量值,下面是FSConstants接口的部分代码,有很多熟悉的常量值。
DataNode 第一个静态变量是org.apache.commons.logging.Log对象,估计用于DataNode的日志记录
定义了一个静态方法public static InetSocketAddress createSocketAddr(String target ) throws IOException
target为符合规则的ip地址及端口号信息,该方法主要用于将一个地址字符串转化为一个java.net.InetSocketAddress
DatanodeProtocol namenode; 数据节点协议,用于向NameNode上传信息与报告块信息,这也是NameNode和DataNode的唯一通信方式。
DatanodeProtocol接口中主要包含了register方法(如果DataNode没有注册ID则为其返回一个ID) sendHeartbeat方法(告诉NameNode它还存活) public Block[] blockReport(向NameNode报告所有本地块) blockReceived (向NameNode报告最新写入块)
errorReport(用在debug中,向namenode报告错误信息)
FSDataset data; FS数据集,控制着整个Data数据集,每个块都有独特的名字存储在磁盘上。
DatanodeRegistration dnRegistration; DataNode注册,包含了DataNode向NameNode通信完整的认证信息,该信息由Datanode随每个通信请求发送。
boolean shouldRun = true;
Vector receivedBlockList = new Vector();
int xmitsInProgress = 0;
Daemon dataXceiveServer = null;
long blockReportInterval; 块数据报告间隔
long heartBeatInterval; 心跳时间间隔
private DataStorage storage = null; 在启动期间,datanode读取其数据存储文件。 数据存储文件存储在所有dfs.data.dir目录中。 它包含版本和storageID。 Datanode在运行时会锁定所有dataStorage文件,因此其他datanode无法开始使用相同的数据存储。 当datanode停止(正常或异常)时,将释放锁定。
private static InetSocketAddress nameNodeAddr;
private static DataNode datanodeObject = null;
private class DataNodeMetrics {
private MetricsRecord metricsRecord = null; // 接口,指标记录,用于记录系统的状态信息
private long bytesWritten = 0L;
private long bytesRead = 0L;
private long blocksWritten = 0L;
private long blocksRead = 0L;
private long blocksReplicated = 0L;
private long blocksRemoved = 0L;
DataNodeMetrics() {
metricsRecord = Metrics.createRecord("dfs", "datanode");
} //初始化,设置记录
synchronized void readBytes(int nbytes) {
bytesRead += nbytes;
//实际作用为向metricsRecord中添加bytes_read =>bytesRead
Metrics.report(metricsRecord, "bytes_read", bytesRead);
}
synchronized void readBlocks(int nblocks) {
blocksRead += nblocks;
Metrics.report(metricsRecord, "blocks_read", blocksRead);
}
synchronized void wroteBlocks(int nblocks) {
blocksWritten += nblocks;
Metrics.report(metricsRecord, "blocks_written", blocksWritten);
}
synchronized void replicatedBlocks(int nblocks) {
blocksReplicated += nblocks;
Metrics.report(metricsRecord, "blocks_replicated", blocksReplicated);
}
synchronized void removedBlocks(int nblocks) {
blocksRemoved += nblocks;
Metrics.report(metricsRecord, "blocks_removed", blocksRemoved);
}
}
DataNodeMetrics myMetrics = new DataNodeMetrics(); //创建的指标记录对象
DataNode(Configuration conf, String[] dataDirs) throws IOException {
// 调用了多参构造器, NetworkTopology.DEFAULT_RACK是默认的机架感知文件的位置
this(conf, NetworkTopology.DEFAULT_RACK, dataDirs );
}
DataNode(Configuration conf, String networkLoc, String[] dataDirs) throws IOException {
// 继续调用更多参的构造器
this(InetAddress.getLocalHost().getHostName(),
networkLoc,
dataDirs,
createSocketAddr(conf.get("fs.default.name", "local")), conf);
// register datanode
// 如果没有在配置文件中配置dfs.datanode.info.port 将设置为默认值50075
int infoServerPort = conf.getInt("dfs.datanode.info.port", 50075);
String infoServerBindAddress = conf.get("dfs.datanode.info.bindAddress", "0.0.0.0");
this.infoServer = new StatusHttpServer("datanode", infoServerBindAddress, infoServerPort, true);
//create a servlet to serve full-file content
this.infoServer.addServlet(null, "/streamFile/*", StreamFile.class);
this.infoServer.start();
this.dnRegistration.infoPort = this.infoServer.getPort();
// register datanode
try {
//向namenode注册,数据节点在启动时需要向namenode注册.报告它现在正在使用哪个存储,以及接收一个注册ID
register();
} catch (IOException ie) {
try {
infoServer.stop();
} catch (Exception e) {
}
throw ie;
}
datanodeObject = this;
}
private DataNode(String machineName,
String networkLoc,
String[] dataDirs,
InetSocketAddress nameNodeAddr,
Configuration conf ) throws IOException {
File[] volumes = new File[dataDirs.length];
for (int idx = 0; idx < dataDirs.length; idx++) {
volumes[idx] = new File(dataDirs[idx]);
}
// use configured nameserver & interface to get local hostname
machineName =
DNS.getDefaultHost
(conf.get("dfs.datanode.dns.interface","default"),
conf.get("dfs.datanode.dns.nameserver","default"));
// get storage info and lock the data dirs
storage = new DataStorage( volumes );
int numDirs = storage.getNumLocked();
if (numDirs == 0) { // all data dirs are in use
throw new IOException("Cannot start multiple Datanode instances "
+ "sharing the same data directories.\n"
+ StringUtils.arrayToString(dataDirs) + " are locked. ");
}
volumes = storage.getLockedDirs();
// connect to name node
// 每1秒向namenode获取一次DataNode协议对象,循环直到获取为止
this.namenode = (DatanodeProtocol)
RPC.waitForProxy(DatanodeProtocol.class,
DatanodeProtocol.versionID,
nameNodeAddr,
conf);
// find free port
ServerSocket ss = null;
int tmpPort = conf.getInt("dfs.datanode.port", 50010);
String bindAddress = conf.get("dfs.datanode.bindAddress", "0.0.0.0");
while (ss == null) {
try {
ss = new ServerSocket(tmpPort,0,InetAddress.getByName(bindAddress));
LOG.info("Opened server at " + tmpPort);
} catch (IOException ie) {
LOG.info("Could not open server at " + tmpPort + ", trying new port");
tmpPort++;
}
}
// construct registration
this.dnRegistration = new DatanodeRegistration(
DFS_CURRENT_VERSION,
machineName + ":" + tmpPort,
storage.getStorageID(),
-1,
"" );
this.networkLoc = networkLoc;
// initialize data node internal structure
this.data = new FSDataset(volumes, conf);
this.dataXceiveServer = new Daemon(new DataXceiveServer(ss));
long blockReportIntervalBasis =
conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);
// 减去了十分之一内的一段时间,可能设计者觉得实际代码执行还需要时间,所以等待时间要略有减少
this.blockReportInterval =
blockReportIntervalBasis - new Random().nextInt((int)(blockReportIntervalBasis/10));
this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
this.nameNodeAddr = nameNodeAddr;
}
/**
* Shut down this instance of the datanode.
* Returns only after shutdown is complete.
*/
//关闭DataNode
public void shutdown() {
try {
infoServer.stop();
} catch (Exception e) {
}
this.shouldRun = false;
((DataXceiveServer) this.dataXceiveServer.getRunnable()).kill();
try {
this.storage.closeAll();
} catch (IOException ie) {
}
}
// 报告磁盘错误并关闭
void handleDiskError( String errMsgr ) {
LOG.warn( "DataNode is shutting down.\n" + errMsgr );
try {
namenode.errorReport(
dnRegistration, DatanodeProtocol.DISK_ERROR, errMsgr);
} catch( IOException ignored) {
}
shutdown();
}
/**
* Server used for receiving/sending a block of data.
* This is created to listen for requests from clients or
* other DataNodes. This small server does not use the
* Hadoop IPC mechanism.
*/
class DataXceiveServer implements Runnable {
boolean shouldListen = true;
ServerSocket ss;
public DataXceiveServer(ServerSocket ss) {
this.ss = ss;
}
/**
*/
public void run() {
try {
while (shouldListen) {
Socket s = ss.accept();
//s.setSoTimeout(READ_TIMEOUT);
data.checkDataDir();
xceiverCount.incr();
new Daemon(new DataXceiver(s)).start();
}
ss.close();
} catch (DiskErrorException de ) {
String errMsgr = de.getMessage();
LOG.warn("Exiting DataXceiveServer due to "+ errMsgr );
handleDiskError(errMsgr);
} catch (IOException ie) {
LOG.info("Exiting DataXceiveServer due to " + ie.toString());
}
}
public void kill() {
this.shouldListen = false;
try {
this.ss.close();
} catch (IOException iex) {
}
}
}
/**
* Thread for processing incoming/outgoing data stream
*/
class DataXceiver implements Runnable {
Socket s;
public DataXceiver(Socket s) {
this.s = s;
LOG.debug("Number of active connections is: "+xceiverCount);
}
/**
* Read/write data from/to the DataXceiveServer.
*/
public void run() {
try {
DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream()));
try {
// 读取指令
byte op = (byte) in.read();
// 如果是写指令
if (op == OP_WRITE_BLOCK) {
writeBlock(in);
//如果是读指令 } else if (op == OP_READ_BLOCK || op == OP_READSKIP_BLOCK ||
op == OP_READ_RANGE_BLOCK) {
readBlock(in, op);
} else {
while (op >= 0) {
System.out.println("Faulty op: " + op);
op = (byte) in.read();
}
throw new IOException("Unknown opcode for incoming data stream");
}
} finally {
in.close();
}
} catch (IOException ie) {
LOG.warn("DataXCeiver", ie);
} finally {
try {
xceiverCount.decr();
LOG.debug("Number of active connections is: "+xceiverCount);
s.close();
} catch (IOException ie2) {
}
}
}
/**
* Read a block from the disk
* @param in The stream to read from
* @param op OP_READ_BLOCK or OP_READ_SKIPBLOCK
* @throws IOException
*/
private void readBlock(DataInputStream in, byte op) throws IOException {
//
// Read in the header
//
Block b = new Block();
b.readFields(in);
long toSkip = 0;
long endOffset = -1;
if (op == OP_READSKIP_BLOCK) {
toSkip = in.readLong();
} else if (op == OP_READ_RANGE_BLOCK) {
toSkip = in.readLong();
endOffset = in.readLong();
}
//
// Open reply stream
//
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
try {
//
// Write filelen of -1 if error
//
if (! data.isValidBlock(b)) {
out.writeLong(-1);
} else {
//
// Get blockdata from disk
//
long len = data.getLength(b);
if (endOffset < 0) { endOffset = len; }
DataInputStream in2 = new DataInputStream(data.getBlockData(b));
out.writeLong(len);
long amtSkipped = 0;
if ((op == OP_READSKIP_BLOCK) || (op == OP_READ_RANGE_BLOCK)) {
if (toSkip > len) {
toSkip = len;
}
try {
amtSkipped = in2.skip(toSkip);
} catch (IOException iex) {
shutdown();
throw iex;
}
out.writeLong(amtSkipped);
}
if (op == OP_READ_RANGE_BLOCK) {
if (endOffset > len) {
endOffset = len;
}
out.writeLong(endOffset);
}
byte buf[] = new byte[BUFFER_SIZE];
try {
int toRead = (int) (endOffset - amtSkipped + 1);
int bytesRead = 0;
try {
bytesRead = in2.read(buf, 0, Math.min(BUFFER_SIZE, toRead));
myMetrics.readBytes(bytesRead);
} catch (IOException iex) {
shutdown();
throw iex;
}
while (toRead > 0 && bytesRead >= 0) {
out.write(buf, 0, bytesRead);
toRead -= bytesRead;
if (toRead > 0) {
try {
bytesRead = in2.read(buf, 0, Math.min(BUFFER_SIZE, toRead));
myMetrics.readBytes(bytesRead);
} catch (IOException iex) {
shutdown();
throw iex;
}
}
}
} catch (SocketException se) {
// This might be because the reader
// closed the stream early
} finally {
try {
in2.close();
} catch (IOException iex) {
shutdown();
throw iex;
}
}
}
myMetrics.readBlocks(1);
LOG.info("Served block " + b + " to " + s.getInetAddress());
} finally {
out.close();
}
}
// 从这里也可以看出。可以在个namenode上关闭所有namenode
/**
* Shut down all datanodes that where started via the run(conf) method.
* Returns only after shutdown is complete.
*/
public static void shutdownAll(){
if(!dataNodeList.isEmpty()){
for (Iterator iterator = dataNodeList.iterator(); iterator.hasNext();) {
DataNode dataNode = (DataNode) iterator.next();
dataNode.shutdown();
}
}
}
......... 太多了
public static void main(String args[]) throws IOException {
try {
Configuration conf = new Configuration();
runAndWait(conf, getNetworkLoc(args, conf));
} catch ( Throwable e ) {
LOG.error( StringUtils.stringifyException( e ) );
System.exit(-1);
}
}
累累累累