最近工作需要,看了HDFS读写数据块这部分。不过可能跟网上大部分帖子不一样,本文主要写了${dfs.data.dir}的选择策略,也就是block在DataNode上的放置策略。我主要是从我们工作需要的角度来读这部分代码的。  

 

1 hdfs-site.xml
2 <property>
3   <name>dfs.data.dir</name>
4   <value>/mnt/datadir1/data,/mnt/datadir2/data,/mnt/datadir3/data</value>
5 </property>

   所谓${dfs.data.dir}的选择策略,就是当DataNode配置有多个${dfs.data.dir}目录时(如上面的配置),该选择哪个目录来存放block。一般多个硬盘分别挂载到不同的${dfs.data.dir}下,所以存储block是要决定block该放到哪个磁盘上。

  创建文件总共有两步:

  1、在写block之前,需要与NameNode通信来生成文件(INodeFile、INodeFileUnderConstruction)。首先在DFSClient端的create()方法中发起写请求,然后通过RPC由NameNode最终调用FSNameSystem的startFileInternal()方法来创建文件。

  1   private void startFileInternal(String src,
  2                                               PermissionStatus permissions,
  3                                               String holder, 
  4                                               String clientMachine, 
  5                                               boolean overwrite,
  6                                               boolean append,
  7                                               boolean createParent,
  8                                               short replication,
  9                                               long blockSize
 10                                               ) throws IOException {
 11     if (NameNode.stateChangeLog.isDebugEnabled()) {
 12       NameNode.stateChangeLog.debug("DIR* startFile: src=" + src
 13           + ", holder=" + holder
 14           + ", clientMachine=" + clientMachine
 15           + ", createParent=" + createParent
 16           + ", replication=" + replication
 17           + ", overwrite=" + overwrite
 18           + ", append=" + append);
 19     }
 20 
 21     FSPermissionChecker pc = getPermissionChecker();
 22     synchronized (this) {
 23       if (isInSafeMode())
 24         throw new SafeModeException("Cannot create " + src, safeMode);
 25       if (!DFSUtil.isValidName(src)) {
 26         throw new IOException("Invalid name: " + src);
 27       }
 28 
 29       // Verify that the destination does not exist as a directory already.
 30       boolean pathExists = dir.exists(src);
 31       if (pathExists && dir.isDir(src)) {
 32         throw new IOException("Cannot create "+ src + "; already exists as a directory");
 33       }
 34 
 35       if (isPermissionEnabled) {
 36         if (append || (overwrite && pathExists)) {
 37           checkPathAccess(pc, src, FsAction.WRITE);
 38         } else {
 39           checkAncestorAccess(pc, src, FsAction.WRITE);
 40         }
 41       }
 42 
 43       if (!createParent) {
 44         verifyParentDir(src);
 45       }
 46 
 47       try {
 48         INode myFile = dir.getFileINode(src); //根据路径寻找该文件
 49         recoverLeaseInternal(myFile, src, holder, clientMachine, false);
 50 
 51         try {
 52           verifyReplication(src, replication, clientMachine);
 53         } catch (IOException e) {
 54           throw new IOException("failed to create " + e.getMessage());
 55         }
 56         if (append) {//若是追加操作
 57           if (myFile == null) {
 58             throw new FileNotFoundException("failed to append to non-existent "
 59                 + src + " on client " + clientMachine);
 60           } else if (myFile.isDirectory()) {
 61             throw new IOException("failed to append to directory " + src
 62                 + " on client " + clientMachine);
 63           }
 64         } else if (!dir.isValidToCreate(src)) {
 65           if (overwrite) {//允许覆盖原来的文件
 66             delete(src, true);
 67           } else {
 68             throw new IOException("failed to create file " + src
 69                 + " on client " + clientMachine
 70                 + " either because the filename is invalid or the file exists");
 71           }
 72         }
 73 
 74         DatanodeDescriptor clientNode = host2DataNodeMap
 75             .getDatanodeByHost(clientMachine);
 76 
 77         if (append) {
 78           //
 79           // Replace current node with a INodeUnderConstruction.
 80           // Recreate in-memory lease record.
 81           //
 82           INodeFile node = (INodeFile) myFile;
 83           INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
 84               node.getLocalNameBytes(), node.getReplication(),
 85               node.getModificationTime(), node.getPreferredBlockSize(),
 86               node.getBlocks(), node.getPermissionStatus(), holder,
 87               clientMachine, clientNode);
 88           dir.replaceNode(src, node, cons);
 89           leaseManager.addLease(cons.clientName, src);
 90 
 91         } else {
 92           // Now we can add the name to the filesystem. This file has no
 93           // blocks associated with it.
 94           //
 95           checkFsObjectLimit();
 96 
 97           // increment global generation stamp
 98           long genstamp = nextGenerationStamp();
 99           INodeFileUnderConstruction newNode = dir.addFile(src, permissions,
100               replication, blockSize, holder, clientMachine, clientNode,
101               genstamp);
102           if (newNode == null) {
103             throw new IOException("DIR* startFile: Unable to add to namespace");
104           }
105           leaseManager.addLease(newNode.clientName, src);
106           if (NameNode.stateChangeLog.isDebugEnabled()) {
107             NameNode.stateChangeLog.debug("DIR* startFile: "
108                                        +"add "+src+" to namespace for "+holder);
109           }
110         }
111       } catch (IOException ie) {
112         NameNode.stateChangeLog.warn("DIR* startFile: "
113                                      +ie.getMessage());
114         throw ie;
115       }
116     }
117   }
startFileInternal()

相关文章: