最近随着项目的深入,发现hive meta有些弊端,就是你会发现它的元数据操作与操作物理集群的代码耦合在一起,非常不利于扩展。比如:在create_table的时候同时进行路径校验及创建,如下代码:

 1   if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) {
 2           if (tbl.getSd().getLocation() == null
 3               || tbl.getSd().getLocation().isEmpty()) {
 4             tblPath = wh.getTablePath(
 5                 ms.getDatabase(tbl.getDbName()), tbl.getTableName());
 6           } else {
 7             if (!isExternal(tbl) && !MetaStoreUtils.isNonNativeTable(tbl)) {
 8               LOG.warn("Location: " + tbl.getSd().getLocation()
 9                   + " specified for non-external table:" + tbl.getTableName());
10             }
11             tblPath = wh.getDnsPath(new Path(tbl.getSd().getLocation()));
12           }
13           tbl.getSd().setLocation(tblPath.toString());
14         }
15 
16         if (tblPath != null) {
17           if (!wh.isDir(tblPath)) {
18             if (!wh.mkdirs(tblPath, true)) {
19               throw new MetaException(tblPath
20                   + " is not a directory or unable to create one");
21             }
22             madeDir = true;
23           }

   所以这是meta无法统一所有元数据的原因么。。其实hive metastore的代码从大的来看,就好比元数据的增删改查,从上次梳理中我们看到,在创建HiveMetaStore的init方法中,同时创建了三种Listener---MetaStorePreEventListener,MetaStoreEventListener,MetaStoreEndFunctionListener用于对每一步事件的监听与记录。同时呢,它还new出了WareHouse,用以进行物理操作。

  

 1     public void init() throws MetaException {
 2       rawStoreClassName = hiveConf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL);
 3       initListeners = MetaStoreUtils.getMetaStoreListeners(
 4           MetaStoreInitListener.class, hiveConf,
 5           hiveConf.getVar(HiveConf.ConfVars.METASTORE_INIT_HOOKS));
 6       for (MetaStoreInitListener singleInitListener: initListeners) {
 7           MetaStoreInitContext context = new MetaStoreInitContext();
 8           singleInitListener.onInit(context);
 9       }
10 
11       String alterHandlerName = hiveConf.get("hive.metastore.alter.impl",
12           HiveAlterHandler.class.getName());
13       alterHandler = (AlterHandler) ReflectionUtils.newInstance(MetaStoreUtils.getClass(
14           alterHandlerName), hiveConf);
15       wh = new Warehouse(hiveConf);
16         。。。。
17     }

 接下来,我们从元数据的生命周期开始,学习下Partiiton的生命周期。在HiveMetaStoreClient中,查找add_partition作为入口,这种操作在我们insert overwrite 以表中某个字段为分区时,比如dt=20170830,作用到的操作。或者是add_partitions,创建分区表后进行数据的导入,那么会创建多个分区路径,下面以add_partiitons为例:

 1   public int add_partitions(List<Partition> new_parts)
 2       throws InvalidObjectException, AlreadyExistsException, MetaException,
 3       TException {
 4     return client.add_partitions(new_parts);
 5   }
 6 
 7   @Override
 8   public List<Partition> add_partitions(
 9       List<Partition> parts, boolean ifNotExists, boolean needResults)
10       throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
11     if (parts.isEmpty()) {
12       return needResults ? new ArrayList<Partition>() : null;
13     }
14     Partition part = parts.get(0);
15     AddPartitionsRequest req = new AddPartitionsRequest(
16         part.getDbName(), part.getTableName(), parts, ifNotExists);
17     req.setNeedResult(needResults);
18     AddPartitionsResult result = client.add_partitions_req(req);
19     return needResults ? filterHook.filterPartitions(result.getPartitions()) : null;
20   }

  这里的client来自于ThriftHiveMetastore.Iface接口对象,其实现子类HiveMetaStore并调用init方法进行创建。随后将封装了AddPartitionsRequest类,其实这个类还是partition的属性,但是这样封装的好处是,今后再调用的时候不用再去获取partition的DbName,,TableName等信息,一次性封装以便后续直接使用该对象。随后,我们查看client调用add_partitions_req,下面代码高能预警,非常多,我们一点点分析。

  

 1    private List<Partition> add_partitions_core(
 2         RawStore ms, String dbName, String tblName, List<Partition> parts, boolean ifNotExists)
 3             throws MetaException, InvalidObjectException, AlreadyExistsException, TException {
 4       logInfo("add_partitions");
 5       boolean success = false;
 6       // Ensures that the list doesn't have dups, and keeps track of directories we have created.
 7       Map<PartValEqWrapper, Boolean> addedPartitions = new HashMap<PartValEqWrapper, Boolean>();
 8       List<Partition> result = new ArrayList<Partition>();
 9       List<Partition> existingParts = null;
10       Table tbl = null;
11       try {
12         ms.openTransaction();
13         tbl = ms.getTable(dbName, tblName);
14         if (tbl == null) {
15           throw new InvalidObjectException("Unable to add partitions because "
16               + "database or table " + dbName + "." + tblName + " does not exist");
17         }
18 
19         if (!parts.isEmpty()) {
20           firePreEvent(new PreAddPartitionEvent(tbl, parts, this));
21         }
22 
23         for (Partition part : parts) {
24           if (!part.getTableName().equals(tblName) || !part.getDbName().equals(dbName)) {
25             throw new MetaException("Partition does not belong to target table "
26                 + dbName + "." + tblName + ": " + part);
27           }
28           boolean shouldAdd = startAddPartition(ms, part, ifNotExists);
29           if (!shouldAdd) {
30             if (existingParts == null) {
31               existingParts = new ArrayList<Partition>();
32             }
33             existingParts.add(part);
34             LOG.info("Not adding partition " + part + " as it already exists");
35             continue;
36           }
37           boolean madeDir = createLocationForAddedPartition(tbl, part);
38           if (addedPartitions.put(new PartValEqWrapper(part), madeDir) != null) {
39             // Technically, for ifNotExists case, we could insert one and discard the other
40             // because the first one now "exists", but it seems better to report the problem
41             // upstream as such a command doesn't make sense.
42             throw new MetaException("Duplicate partitions in the list: " + part);
43           }
44           initializeAddedPartition(tbl, part, madeDir);
45           result.add(part);
46         }
47         if (!result.isEmpty()) {
48           success = ms.addPartitions(dbName, tblName, result);
49         } else {
50           success = true;
51         }
52         success = success && ms.commitTransaction();
53       } finally {
54         if (!success) {
55           ms.rollbackTransaction();
56           for (Entry<PartValEqWrapper, Boolean> e : addedPartitions.entrySet()) {
57             if (e.getValue()) {
58               wh.deleteDir(new Path(e.getKey().partition.getSd().getLocation()), true);
59               // we just created this directory - it's not a case of pre-creation, so we nuke
60             }
61           }
62           fireMetaStoreAddPartitionEvent(tbl, parts, null, false);
63         } else {
64           fireMetaStoreAddPartitionEvent(tbl, result, null, true);
65           if (existingParts != null) {
66             // The request has succeeded but we failed to add these partitions.
67             fireMetaStoreAddPartitionEvent(tbl, existingParts, null, false);
68           }
69         }
70       }
71       return result;
72     }
View Code

相关文章:

  • 2018-08-07
  • 2021-08-18
  • 2021-04-15
  • 2021-12-29
  • 2022-01-12
  • 2021-06-06
  • 2021-06-27
  • 2021-03-30
猜你喜欢
  • 2022-03-05
  • 2021-06-10
  • 2021-10-14
  • 2022-12-23
  • 2021-07-21
  • 2021-10-08
相关资源
相似解决方案