【问题标题】:Get hive partition from Spark dataset从 Spark 数据集中获取 hive 分区
【发布时间】:2018-03-23 19:56:50
【问题描述】:

我正在研究在写入 S3 后自动将表和分区注册到配置单元元存储的东西。

在注册所有分区之前,我需要知道所有分区值。现在我正在做ds.select(partitionColumn).distinct().collectAsList(); 来获取所有分区值。

有没有更好的方法从我的数据集中获取分区值?

【问题讨论】:

  • AWS Glue 已经为您完成了这项工作。
  • 我不知道有更好的解决方案,我也是这样做的
  • @ThiagoBaldim 我们看过 AWS Glue,但它似乎不允许我们将它用作外部产品的元存储服务。像 Tableau、Databricks 等...
  • @RaphaelRoth 是的,它有效。但是如果数据集很大,那么它需要一段时间才能完成。我想知道,因为我首先调用ds.write.partitionBy.save,它已经将数据写入所有分区。不过确实想出了办法。
  • 确实如此,这可以让您轻松使用 EMR。但是如果你需要从中获取信息,你可以尝试在 Boto 之上构建一些东西。

标签: apache-spark hive


【解决方案1】:

看完Spark源码,特别是org.apache.spark.sql.execution.command.ddl.scala中的AlterTableRecoverPartitionsCommand,就是ALTER TABLE RECOVER PARTITIONS的Spark实现。它扫描所有分区,然后注册它们。

所以,这是相同的想法,从我们刚刚写入的位置扫描所有分区。

从中获取键名,然后从中提取分区名称/值。

这里是获取路径的代码sn-p。

String location = "s3n://somebucket/somefolder/dateid=20171010/";
Path root = new Path(location);

Configuration hadoopConf = sparkSession.sessionState().newHadoopConf();
FileSystem fs = root.getFileSystem(hadoopConf);

JobConf jobConf = new JobConf(hadoopConf, this.getClass());
final PathFilter pathFilter = FileInputFormat.getInputPathFilter(jobConf);

FileStatus[] fileStatuses = fs.listStatus(root, path -> {
    String name = path.getName();
    if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) {
        return pathFilter == null || pathFilter.accept(path);
    } else {
        return false;
    }
});

for(FileStatus fileStatus: fileStatuses) {
    System.out.println(fileStatus.getPath().getName());
}

【讨论】:

  • 基于这种方法,我们可以扩展当前的过滤器来做额外的工作。权衡是,如果 SaveMode 不是 Overwrite,我们返回的路径并不完全是我们写入的路径。就我而言,我现在仅将其用于覆盖模式。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-10-27
  • 1970-01-01
相关资源
最近更新 更多