【问题标题】:How to use ExternalCatalog.listPartitions() with Java如何在 Java 中使用 ExternalCatalog.listPartitions()
【发布时间】:2023-04-11 01:39:02
【问题描述】:

我是 Java 新手。我想在 hiveTable 中删除分区。我想使用SparkSession.ExternalCatalog().listPartitionsSparkSession.ExternalCatalog().dropPartitions

我在 scala How to truncate data and drop all partitions from a Hive table using Spark 上看到了这个方法 但我不明白如何在 Java 上运行它。它是 etl 进程的一部分,我想了解如何在 Java 上处理它。

我的代码失败了,因为对如何使用数据类型进行操作并将其转换为 java 的误解。需要什么类型的对象,如何理解返回API的数据。

我的代码示例:

ExternalCatalog ec = SparkSessionFactory.getSparkSession.sharedState().externalCatalog();
ec.listPartitions("need_schema", "need_table");

它失败了,因为:

类 org.apache.spark.sql.catalog.ExternalCatalog 中的方法 listPartitions 不能应用于给定类型。

我无法击败它,因为关于 api (https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-ExternalCatalog.html#listPartitions) 和 java 知识的信息较少,而且我找到的所有示例都写在 scala 上。 最后,我需要将这段适用于 scala 的代码转换为 java:

def dropPartitions(spark:SparkSession, shema:String, table:String, need_date:String):Unit = {
        val cat = spark.sharedState.externalCatalog
        val partit = cat.ListPartitions(shema,table).map(_.spec).map(t => t.get("partition_field")).flatten
        val filteredPartit = partita.filter(_<dt).map(x => Map("partition_field" -> x))
        cat.dropPartitions(
    shema
    ,table
    ,filteredPartitions
    ,ignoreIfNotExists=true
    ,purge=false
    ,retainData=false
    }

拜托,如果您知道如何处理它,您可以在这件事上提供帮助:

  1. Java 中的一些代码示例,用于编写我自己的容器来操作来自 externalCatalog 的数据
  2. 这个 api 中使用了什么数据结构以及一些理论资源,可以帮助我理解它们如何与 java 一起使用
  3. scala 代码字符串中的含义是什么:cat.ListPartitions(shema,table).map(_.spec).map(t => t.get("partition_field")).flatten? tnx

更新中 非常感谢您的反馈@jpg。我会尽力。我有很大的 etl 任务,它的目标是每周写入一次动态分区表数据。制作此数据集市的业务规则:(sysdate - 90 天)。因此,我想在公共访问模式的目标表中删除分区数组(按天)。而且我已经阅读了删除分区的正确方法-使用externalCatalog。由于这个项目的历史传统,我应该使用 java)并尝试了解如何最有效地做到这一点。 externalCatalog 的一些方法我可以通过 System.out.println() 返回到终端: externalCatalog.tableExists()、externalCatalog.listTables() 和 externalCatalog.getTable 的方法。但我不明白如何处理 externalCatalog.listPartitions。

再更新一次 大家好。我的任务向前迈进了一步: 现在我可以在列表分区的终端缓冲区中返回:

ExternalCatalog ec = SparkSessionFactory.getSparkSession.sharedState().externalCatalog();
ec.listPartitions("schema", "table", Option.empty()); // work! null or miss parameter fail program
Seq<CatalogTablePartition> ctp = ec.listPartitions("schema", "table", Option.empty());
List<CatalogTablePartition> catalogTablePartitions = JavaConverters.seqAsJavaListConverter(ctp).asJava();
for CatalogTablePartition catalogTablePartition: catalogTablePartitions) {
    System.out.println(catalogTablePartition.toLinkedHashMap().get("Partition Values"));//retutn me value of partition like "Some([validation_date=2021-07-01])"
)

但这是另一个问题。 我可以在方法 ec.dropPartitions 中返回这个 api 中的值,比如 Java List。它需要 3d 参数 Seq> 结构。在这种情况下我也无法过滤分区 - 在我的梦想中,我想按日期参数过滤分区的值,然后将其删除。 如果有人知道如何用这个 api 编写 map 方法来返回它,就像在我的 scala 示例中一样,请帮助我。

【问题讨论】:

  • 请编辑问题以将其限制为具有足够详细信息的特定问题,以确定适当的答案。

标签: java scala apache-spark apache-spark-sql hive-metastore


【解决方案1】:

我自己解决了。也许它会帮助某人。

public static void partitionDeleteLessDate(String db_name, String table_name, String date_less_delete) {
    ExternalCatalog ec = SparkSessionFactory.getSparkSession.sharedState().externalCatalog();
    Seq<CatalogTablePartition> ctp = ec.listPartitions(db_name, table_name, Option.empty());
    List<CatalogTablePartition> catalogTablePartitions = JavaConverters.seqAsJavaListConverter(ctp).asJava();
    List<Map<String, String>> allPartList = catalogTablePartitions.stream.
        .map(s -> s.spec.seq())
        .collect(Collectors.toList());
    List<String> datePartDel = 
        allPartList.stream()
            .map(x -> x.get("partition_name").get())
            .sorted()
            .collect(Collectors.toList());
    String lessThisDateDelete = date_less_delete;
    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
    LocalDate date = LocalDate.parse(lessThisDateDelete, formatter);
    
    List<String> filteredDates = datePartDel.stream()
        .map(s -> LocalDate.parse(s, formatter))
        .filter(d -> d.isBefore(date))
        .map(s -> s.toString())
        .collect(Collectors.toList());
    
    for (String seeDate : filteredDates)) {
        List<Map<String, String>> elem = allPartList.stream()
            .filter(x -> x.get("partition_name").get().equals(seeDate))
            .collect(Collectors.toList());
        Seq<Map<String, String>> seqElem = JavaConverters.asScalaIteratorConverter(elem.iterator()).asScala.toSeq();
        ec.dropPartitions(
        db_name
        , table_name
        , seqElem
        , true
        , false
        , false
        );
    } 
}

【讨论】:

    猜你喜欢
    • 2013-02-20
    • 2014-12-09
    • 2016-03-28
    • 2014-07-04
    • 2018-01-20
    • 2017-01-15
    • 2018-11-12
    • 2011-02-04
    • 2014-07-27
    相关资源
    最近更新 更多