今天正式开始hadoop源码学习,此前chang'shi尝试过hadoop3源码阅读,无奈发展到3,源码已过于复杂,难以看懂,于是尝试haddop0.1.1.2源码学习,发现虽然时隔jiu'y久远,设计思想上还是一脉相承。今天来看DataNode类

 

DataNode 类实现了 FSConstants(FS常数)和Runnable接口,FSConstants来自于同一包下。FSConstants定义了许多常数,第一个是MIN_BLOCKS_FOR_WRITE(最小的可写块 ), 还有诸多基于不同客户分别和NameNode和DataNode之间交互的信号量以及一些常量值,下面是FSConstants接口的部分代码,有很多熟悉的常量值。

hadoop(0.1.1.2)源码学习-org.apache.hadoop.dfs DataNode

hadoop(0.1.1.2)源码学习-org.apache.hadoop.dfs DataNode

hadoop(0.1.1.2)源码学习-org.apache.hadoop.dfs DataNode

hadoop(0.1.1.2)源码学习-org.apache.hadoop.dfs DataNode

hadoop(0.1.1.2)源码学习-org.apache.hadoop.dfs DataNode

hadoop(0.1.1.2)源码学习-org.apache.hadoop.dfs DataNode

hadoop(0.1.1.2)源码学习-org.apache.hadoop.dfs DataNode

hadoop(0.1.1.2)源码学习-org.apache.hadoop.dfs DataNode

DataNode 第一个静态变量是org.apache.commons.logging.Log对象,估计用于DataNode的日志记录

定义了一个静态方法public static InetSocketAddress createSocketAddr(String target ) throws IOException

target为符合规则的ip地址及端口号信息,该方法主要用于将一个地址字符串转化为一个java.net.InetSocketAddress

DatanodeProtocol namenode;  数据节点协议,用于向NameNode上传信息与报告块信息,这也是NameNode和DataNode的唯一通信方式。

DatanodeProtocol接口中主要包含了register方法(如果DataNode没有注册ID则为其返回一个ID) sendHeartbeat方法(告诉NameNode它还存活)   public Block[] blockReport(向NameNode报告所有本地块) blockReceived (向NameNode报告最新写入块)

errorReport(用在debug中,向namenode报告错误信息)

 

FSDataset data;  FS数据集,控制着整个Data数据集,每个块都有独特的名字存储在磁盘上。

DatanodeRegistration dnRegistration;   DataNode注册,包含了DataNode向NameNode通信完整的认证信息,该信息由Datanode随每个通信请求发送。

 

 boolean shouldRun = true;
    Vector receivedBlockList = new Vector();
    int xmitsInProgress = 0;
    Daemon dataXceiveServer = null;

long blockReportInterval;   块数据报告间隔
    long heartBeatInterval;    心跳时间间隔

private DataStorage storage = null;   在启动期间,datanode读取其数据存储文件。 数据存储文件存储在所有dfs.data.dir目录中。 它包含版本和storageID。 Datanode在运行时会锁定所有dataStorage文件,因此其他datanode无法开始使用相同的数据存储。 当datanode停止(正常或异常)时,将释放锁定。

 private static InetSocketAddress nameNodeAddr;
    private static DataNode datanodeObject = null;

private class DataNodeMetrics {
      private MetricsRecord metricsRecord = null;  // 接口,指标记录,用于记录系统的状态信息
      
      
      private long bytesWritten = 0L;
      private long bytesRead = 0L;
      private long blocksWritten = 0L;
      private long blocksRead = 0L;
      private long blocksReplicated = 0L;
      private long blocksRemoved = 0L;

DataNodeMetrics() {
        metricsRecord = Metrics.createRecord("dfs", "datanode");
      }            //初始化,设置记录
synchronized void readBytes(int nbytes) {
        bytesRead += nbytes;

//实际作用为向metricsRecord中添加bytes_read    =>bytesRead


        Metrics.report(metricsRecord, "bytes_read", bytesRead);       
      }

synchronized void readBlocks(int nblocks) {
        blocksRead += nblocks;
        Metrics.report(metricsRecord, "blocks_read", blocksRead);
      }
      
      synchronized void wroteBlocks(int nblocks) {
        blocksWritten += nblocks;
        Metrics.report(metricsRecord, "blocks_written", blocksWritten);
      }
      
      synchronized void replicatedBlocks(int nblocks) {
        blocksReplicated += nblocks;
        Metrics.report(metricsRecord, "blocks_replicated", blocksReplicated);
      }
      
      synchronized void removedBlocks(int nblocks) {
        blocksRemoved += nblocks;
        Metrics.report(metricsRecord, "blocks_removed", blocksRemoved);
      }
    }

 

DataNodeMetrics myMetrics = new DataNodeMetrics();  //创建的指标记录对象

 DataNode(Configuration conf, String[] dataDirs) throws IOException {

  // 调用了多参构造器, NetworkTopology.DEFAULT_RACK是默认的机架感知文件的位置
      this(conf, NetworkTopology.DEFAULT_RACK, dataDirs );
    }   

       

 DataNode(Configuration conf, String networkLoc, String[] dataDirs) throws IOException {

// 继续调用更多参的构造器
        this(InetAddress.getLocalHost().getHostName(),
             networkLoc,
             dataDirs,
             createSocketAddr(conf.get("fs.default.name", "local")), conf);
        // register datanode

// 如果没有在配置文件中配置dfs.datanode.info.port 将设置为默认值50075
        int infoServerPort = conf.getInt("dfs.datanode.info.port", 50075);


        String infoServerBindAddress = conf.get("dfs.datanode.info.bindAddress", "0.0.0.0");
        this.infoServer = new StatusHttpServer("datanode", infoServerBindAddress, infoServerPort, true);
        //create a servlet to serve full-file content
        this.infoServer.addServlet(null, "/streamFile/*", StreamFile.class);
        this.infoServer.start();
        this.dnRegistration.infoPort = this.infoServer.getPort();
        // register datanode
        try {

//向namenode注册,数据节点在启动时需要向namenode注册.报告它现在正在使用哪个存储,以及接收一个注册ID
          register();  
        } catch (IOException ie) {
          try {
            infoServer.stop();
          } catch (Exception e) {
          }
          throw ie;
        }
        datanodeObject = this;
    }                       

 

private DataNode(String machineName,
                    String networkLoc,
                    String[] dataDirs, 
                    InetSocketAddress nameNodeAddr, 
                    Configuration conf ) throws IOException {
      File[] volumes = new File[dataDirs.length];
      for (int idx = 0; idx < dataDirs.length; idx++) {
        volumes[idx] = new File(dataDirs[idx]);
      }

      // use configured nameserver & interface to get local hostname
      machineName =
        DNS.getDefaultHost
        (conf.get("dfs.datanode.dns.interface","default"),
         conf.get("dfs.datanode.dns.nameserver","default"));
 
      // get storage info and lock the data dirs
      storage = new DataStorage( volumes );
      int numDirs = storage.getNumLocked();
      if (numDirs == 0) { // all data dirs are in use
        throw new IOException("Cannot start multiple Datanode instances "
                              + "sharing the same data directories.\n"
                              + StringUtils.arrayToString(dataDirs) + " are locked. ");
      }
      volumes = storage.getLockedDirs();
      // connect to name node

// 每1秒向namenode获取一次DataNode协议对象,循环直到获取为止
      this.namenode = (DatanodeProtocol) 
          RPC.waitForProxy(DatanodeProtocol.class,
                           DatanodeProtocol.versionID,
                           nameNodeAddr, 
                           conf);
      // find free port
      ServerSocket ss = null;
      int tmpPort = conf.getInt("dfs.datanode.port", 50010);
      String bindAddress = conf.get("dfs.datanode.bindAddress", "0.0.0.0");
      while (ss == null) {
        try {
          ss = new ServerSocket(tmpPort,0,InetAddress.getByName(bindAddress));
          LOG.info("Opened server at " + tmpPort);
        } catch (IOException ie) {
          LOG.info("Could not open server at " + tmpPort + ", trying new port");
          tmpPort++;
        }
      }
      // construct registration
      this.dnRegistration = new DatanodeRegistration(
                                        DFS_CURRENT_VERSION, 
                                        machineName + ":" + tmpPort, 
                                        storage.getStorageID(),
                                        -1,
                                        "" );
      this.networkLoc = networkLoc;
      // initialize data node internal structure
      this.data = new FSDataset(volumes, conf);
      this.dataXceiveServer = new Daemon(new DataXceiveServer(ss));

      long blockReportIntervalBasis =
        conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);

// 减去了十分之一内的一段时间,可能设计者觉得实际代码执行还需要时间,所以等待时间要略有减少
      this.blockReportInterval =
        blockReportIntervalBasis - new Random().nextInt((int)(blockReportIntervalBasis/10));
      this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
      this.nameNodeAddr = nameNodeAddr;
    }

 

 

/**
     * Shut down this instance of the datanode.
     * Returns only after shutdown is complete.
     */

//关闭DataNode
 

 public void shutdown() {
        try {
          infoServer.stop();
        } catch (Exception e) {
        }
        this.shouldRun = false;
        ((DataXceiveServer) this.dataXceiveServer.getRunnable()).kill();
        try {
          this.storage.closeAll();
        } catch (IOException ie) {
        }
    }

// 报告磁盘错误并关闭

    void handleDiskError( String errMsgr ) {
        LOG.warn( "DataNode is shutting down.\n" + errMsgr );
        try {
            namenode.errorReport(
                    dnRegistration, DatanodeProtocol.DISK_ERROR, errMsgr);
        } catch( IOException ignored) {              
        }
        shutdown();
    }

 

 

 /**
     * Server used for receiving/sending a block of data.
     * This is created to listen for requests from clients or 
     * other DataNodes.  This small server does not use the 
     * Hadoop IPC mechanism.
     */

 class DataXceiveServer implements Runnable {
        boolean shouldListen = true;
        ServerSocket ss;
        public DataXceiveServer(ServerSocket ss) {
            this.ss = ss;
        }

        /**
         */
        public void run() {
            try {
                while (shouldListen) {
                    Socket s = ss.accept();
                    //s.setSoTimeout(READ_TIMEOUT);
                    data.checkDataDir();
                    xceiverCount.incr();
                    new Daemon(new DataXceiver(s)).start();
                }
                ss.close();
            } catch (DiskErrorException de ) {
                String errMsgr = de.getMessage();
                LOG.warn("Exiting DataXceiveServer due to "+ errMsgr );
                handleDiskError(errMsgr);
            } catch (IOException ie) {
                LOG.info("Exiting DataXceiveServer due to " + ie.toString());
            }
        }
        public void kill() {
            this.shouldListen = false;
            try {
                this.ss.close();
            } catch (IOException iex) {
            }
        }
    }

 /**
     * Thread for processing incoming/outgoing data stream
     */
    class DataXceiver implements Runnable {
        Socket s;
        public DataXceiver(Socket s) {
            this.s = s;
            LOG.debug("Number of active connections is: "+xceiverCount);
        }

        /**
         * Read/write data from/to the DataXceiveServer.
         */

        public void run() {
            try {
                DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream()));
                try {

       // 读取指令
                    byte op = (byte) in.read();

// 如果是写指令
                    if (op == OP_WRITE_BLOCK) {
                        writeBlock(in);
//如果是读指令                 } else if (op == OP_READ_BLOCK || op == OP_READSKIP_BLOCK ||
                        op == OP_READ_RANGE_BLOCK) {
                        readBlock(in, op);
                    } else {
                        while (op >= 0) {
                            System.out.println("Faulty op: " + op);
                            op = (byte) in.read();
                        }
                        throw new IOException("Unknown opcode for incoming data stream");
                    }
                } finally {
                    in.close();
                }
            } catch (IOException ie) {
              LOG.warn("DataXCeiver", ie);
            } finally {
                try {
                    xceiverCount.decr();
                    LOG.debug("Number of active connections is: "+xceiverCount);
                    s.close();
                } catch (IOException ie2) {
                }
            }
        }

/**
         * Read a block from the disk
         * @param in The stream to read from
         * @param op OP_READ_BLOCK or OP_READ_SKIPBLOCK
         * @throws IOException
         */

private void readBlock(DataInputStream in, byte op) throws IOException {
          //
          // Read in the header
          //
          Block b = new Block();
          b.readFields(in);

          long toSkip = 0;
          long endOffset = -1;
          if (op == OP_READSKIP_BLOCK) {
              toSkip = in.readLong();
          } else if (op == OP_READ_RANGE_BLOCK) {
            toSkip = in.readLong();
            endOffset = in.readLong();
          }

          //
          // Open reply stream
          //
          DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
          try {
              //
              // Write filelen of -1 if error
              //
              if (! data.isValidBlock(b)) {
                  out.writeLong(-1);
              } else {
                  //
                  // Get blockdata from disk
                  //
                  long len = data.getLength(b);
                  if (endOffset < 0) { endOffset = len; }
                  DataInputStream in2 = new DataInputStream(data.getBlockData(b));
                  out.writeLong(len);

                  long amtSkipped = 0;
                  if ((op == OP_READSKIP_BLOCK) || (op == OP_READ_RANGE_BLOCK)) {
                      if (toSkip > len) {
                          toSkip = len;
                      }
                      try {
                          amtSkipped = in2.skip(toSkip);
                      } catch (IOException iex) {
                          shutdown();
                          throw iex;
                      }
                      out.writeLong(amtSkipped);
                  }
                  if (op == OP_READ_RANGE_BLOCK) {
                      if (endOffset > len) {
                        endOffset = len;
                      }
                      out.writeLong(endOffset);
                  }

                  byte buf[] = new byte[BUFFER_SIZE];
                  try {
                    int toRead = (int) (endOffset - amtSkipped + 1);
                      int bytesRead = 0;
                      try {
                          bytesRead = in2.read(buf, 0, Math.min(BUFFER_SIZE, toRead));
                          myMetrics.readBytes(bytesRead);
                      } catch (IOException iex) {
                          shutdown();
                          throw iex;
                      }
                      while (toRead > 0 && bytesRead >= 0) {
                          out.write(buf, 0, bytesRead);
                          toRead -= bytesRead;
                          if (toRead > 0) {
                          try {
                              bytesRead = in2.read(buf, 0, Math.min(BUFFER_SIZE, toRead));
                              myMetrics.readBytes(bytesRead);
                          } catch (IOException iex) {
                              shutdown();
                              throw iex;
                          }
                          }
                      }
                  } catch (SocketException se) {
                      // This might be because the reader
                      // closed the stream early
                  } finally {
                      try {
                          in2.close();
                      } catch (IOException iex) {
                          shutdown();
                          throw iex;
                      }
                  }
              }
              myMetrics.readBlocks(1);
              LOG.info("Served block " + b + " to " + s.getInetAddress());
          } finally {
              out.close();
          }
        }

 

// 从这里也可以看出。可以在个namenode上关闭所有namenode

 /**
     * Shut down all datanodes that where started via the run(conf) method.
     * Returns only after shutdown is complete.
     */
    public static void shutdownAll(){
      if(!dataNodeList.isEmpty()){
        for (Iterator iterator = dataNodeList.iterator(); iterator.hasNext();) {
          DataNode dataNode = (DataNode) iterator.next();
          dataNode.shutdown();
        }
      }
    }

 

......... 太多了

public static void main(String args[]) throws IOException {
      try {
        Configuration conf = new Configuration();
        runAndWait(conf, getNetworkLoc(args, conf));
      } catch ( Throwable e ) {
        LOG.error( StringUtils.stringifyException( e ) );
        System.exit(-1);
      }
    }

累累累累

相关文章:

  • 2021-09-14
  • 2022-02-17
  • 2021-11-16
  • 2022-12-23
  • 2022-12-23
  • 2021-05-30
  • 2021-04-04
  • 2021-07-19
猜你喜欢
  • 2021-04-25
  • 2022-12-23
  • 2021-12-30
  • 2021-12-05
  • 2021-08-13
相关资源
相似解决方案