最近随着项目的深入,发现hive meta有些弊端,就是你会发现它的元数据操作与操作物理集群的代码耦合在一起,非常不利于扩展。比如:在create_table的时候同时进行路径校验及创建,如下代码:
1 if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) { 2 if (tbl.getSd().getLocation() == null 3 || tbl.getSd().getLocation().isEmpty()) { 4 tblPath = wh.getTablePath( 5 ms.getDatabase(tbl.getDbName()), tbl.getTableName()); 6 } else { 7 if (!isExternal(tbl) && !MetaStoreUtils.isNonNativeTable(tbl)) { 8 LOG.warn("Location: " + tbl.getSd().getLocation() 9 + " specified for non-external table:" + tbl.getTableName()); 10 } 11 tblPath = wh.getDnsPath(new Path(tbl.getSd().getLocation())); 12 } 13 tbl.getSd().setLocation(tblPath.toString()); 14 } 15 16 if (tblPath != null) { 17 if (!wh.isDir(tblPath)) { 18 if (!wh.mkdirs(tblPath, true)) { 19 throw new MetaException(tblPath 20 + " is not a directory or unable to create one"); 21 } 22 madeDir = true; 23 }
所以这是meta无法统一所有元数据的原因么。。其实hive metastore的代码从大的来看,就好比元数据的增删改查,从上次梳理中我们看到,在创建HiveMetaStore的init方法中,同时创建了三种Listener---MetaStorePreEventListener,MetaStoreEventListener,MetaStoreEndFunctionListener用于对每一步事件的监听与记录。同时呢,它还new出了WareHouse,用以进行物理操作。
1 public void init() throws MetaException { 2 rawStoreClassName = hiveConf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL); 3 initListeners = MetaStoreUtils.getMetaStoreListeners( 4 MetaStoreInitListener.class, hiveConf, 5 hiveConf.getVar(HiveConf.ConfVars.METASTORE_INIT_HOOKS)); 6 for (MetaStoreInitListener singleInitListener: initListeners) { 7 MetaStoreInitContext context = new MetaStoreInitContext(); 8 singleInitListener.onInit(context); 9 } 10 11 String alterHandlerName = hiveConf.get("hive.metastore.alter.impl", 12 HiveAlterHandler.class.getName()); 13 alterHandler = (AlterHandler) ReflectionUtils.newInstance(MetaStoreUtils.getClass( 14 alterHandlerName), hiveConf); 15 wh = new Warehouse(hiveConf); 16 。。。。 17 }
接下来,我们从元数据的生命周期开始,学习下Partiiton的生命周期。在HiveMetaStoreClient中,查找add_partition作为入口,这种操作在我们insert overwrite 以表中某个字段为分区时,比如dt=20170830,作用到的操作。或者是add_partitions,创建分区表后进行数据的导入,那么会创建多个分区路径,下面以add_partiitons为例:
1 public int add_partitions(List<Partition> new_parts) 2 throws InvalidObjectException, AlreadyExistsException, MetaException, 3 TException { 4 return client.add_partitions(new_parts); 5 } 6 7 @Override 8 public List<Partition> add_partitions( 9 List<Partition> parts, boolean ifNotExists, boolean needResults) 10 throws InvalidObjectException, AlreadyExistsException, MetaException, TException { 11 if (parts.isEmpty()) { 12 return needResults ? new ArrayList<Partition>() : null; 13 } 14 Partition part = parts.get(0); 15 AddPartitionsRequest req = new AddPartitionsRequest( 16 part.getDbName(), part.getTableName(), parts, ifNotExists); 17 req.setNeedResult(needResults); 18 AddPartitionsResult result = client.add_partitions_req(req); 19 return needResults ? filterHook.filterPartitions(result.getPartitions()) : null; 20 }
这里的client来自于ThriftHiveMetastore.Iface接口对象,其实现子类HiveMetaStore并调用init方法进行创建。随后将封装了AddPartitionsRequest类,其实这个类还是partition的属性,但是这样封装的好处是,今后再调用的时候不用再去获取partition的DbName,,TableName等信息,一次性封装以便后续直接使用该对象。随后,我们查看client调用add_partitions_req,下面代码高能预警,非常多,我们一点点分析。
1 private List<Partition> add_partitions_core( 2 RawStore ms, String dbName, String tblName, List<Partition> parts, boolean ifNotExists) 3 throws MetaException, InvalidObjectException, AlreadyExistsException, TException { 4 logInfo("add_partitions"); 5 boolean success = false; 6 // Ensures that the list doesn't have dups, and keeps track of directories we have created. 7 Map<PartValEqWrapper, Boolean> addedPartitions = new HashMap<PartValEqWrapper, Boolean>(); 8 List<Partition> result = new ArrayList<Partition>(); 9 List<Partition> existingParts = null; 10 Table tbl = null; 11 try { 12 ms.openTransaction(); 13 tbl = ms.getTable(dbName, tblName); 14 if (tbl == null) { 15 throw new InvalidObjectException("Unable to add partitions because " 16 + "database or table " + dbName + "." + tblName + " does not exist"); 17 } 18 19 if (!parts.isEmpty()) { 20 firePreEvent(new PreAddPartitionEvent(tbl, parts, this)); 21 } 22 23 for (Partition part : parts) { 24 if (!part.getTableName().equals(tblName) || !part.getDbName().equals(dbName)) { 25 throw new MetaException("Partition does not belong to target table " 26 + dbName + "." + tblName + ": " + part); 27 } 28 boolean shouldAdd = startAddPartition(ms, part, ifNotExists); 29 if (!shouldAdd) { 30 if (existingParts == null) { 31 existingParts = new ArrayList<Partition>(); 32 } 33 existingParts.add(part); 34 LOG.info("Not adding partition " + part + " as it already exists"); 35 continue; 36 } 37 boolean madeDir = createLocationForAddedPartition(tbl, part); 38 if (addedPartitions.put(new PartValEqWrapper(part), madeDir) != null) { 39 // Technically, for ifNotExists case, we could insert one and discard the other 40 // because the first one now "exists", but it seems better to report the problem 41 // upstream as such a command doesn't make sense. 42 throw new MetaException("Duplicate partitions in the list: " + part); 43 } 44 initializeAddedPartition(tbl, part, madeDir); 45 result.add(part); 46 } 47 if (!result.isEmpty()) { 48 success = ms.addPartitions(dbName, tblName, result); 49 } else { 50 success = true; 51 } 52 success = success && ms.commitTransaction(); 53 } finally { 54 if (!success) { 55 ms.rollbackTransaction(); 56 for (Entry<PartValEqWrapper, Boolean> e : addedPartitions.entrySet()) { 57 if (e.getValue()) { 58 wh.deleteDir(new Path(e.getKey().partition.getSd().getLocation()), true); 59 // we just created this directory - it's not a case of pre-creation, so we nuke 60 } 61 } 62 fireMetaStoreAddPartitionEvent(tbl, parts, null, false); 63 } else { 64 fireMetaStoreAddPartitionEvent(tbl, result, null, true); 65 if (existingParts != null) { 66 // The request has succeeded but we failed to add these partitions. 67 fireMetaStoreAddPartitionEvent(tbl, existingParts, null, false); 68 } 69 } 70 } 71 return result; 72 }