【问题标题】:Drop partitions from Spark从 Spark 中删除分区
【发布时间】:2021-05-02 10:38:17
【问题描述】:

我正在使用 Java-Spark (Spark 2.2.0)。

我正在尝试按如下方式删除 Hive 分区:

spark.sql(""ALTER TABLE backup DROP PARTITION (date < '20180910')"

并得到以下异常:

org.apache.spark.sql.catalyst.parser.ParseException: 不匹配的输入 '

我知道这是未解决的问题 ALTER TABLE DROP PARTITION should support comparators,应该在我的版本中修复,但我仍然遇到异常。

从 Spark 中删除分区的替代方法是什么?有另一种实现吗?

谢谢。

【问题讨论】:

    标签: apache-spark hive


    【解决方案1】:

    似乎暂时没有办法做到这一点。 如SPARK-14922所示,本次修复的目标版本为3.0.0,目前仍在进行中。

    因此,我认为有两种可能的解决方法。

    让我们使用 Spark 2.4.3 设置问题:

    // We create the table
    spark.sql("CREATE TABLE IF NOT EXISTS potato (size INT) PARTITIONED BY (hour STRING)")
    
    // Enable dynamic partitioning 
    spark.conf.set("hive.exec.dynamic.partition.mode","nonstrict")
    
    // Insert some dummy records
    (1 to 9).map(i => spark.sql(s"INSERT INTO potato VALUES ($i, '2020-06-07T0$i')"))
    
    // Verify inserts
    spark.table("potato").count // 9 records
    

    现在...尝试从 spark 内部删除单个分区!

    spark.sql("""ALTER TABLE potato DROP IF EXISTS PARTITION (hour='2020-06-07T01')""")
    spark.table("potato").count // 8 records
    

    尝试删除多个分区不起作用。

    spark.sql("""ALTER TABLE potato DROP IF EXISTS PARTITION (hour="2020-06-07T02", hour="2020-06-07T03")""")
    
    org.apache.spark.sql.catalyst.parser.ParseException:
    Found duplicate keys 'hour'.(line 1, pos 34)
    
    == SQL ==
    ALTER TABLE potato DROP IF EXISTS PARTITION (hour="2020-06-07T02", hour="2020-06-07T03")
    ----------------------------------^^^
    

    使用比较运算符删除一系列分区也不起作用。

    spark.sql("""ALTER TABLE potato DROP IF EXISTS PARTITION (hour<="2020-06-07T03")""")
    
    org.apache.spark.sql.catalyst.parser.ParseException:
    mismatched input '<=' expecting {')', ','}(line 1, pos 49)
    
    == SQL ==
    ALTER TABLE potato DROP IF EXISTS PARTITION (hour<="2020-06-07T03")
    -------------------------------------------------^^^
    

    这应该是因为分区列是一个字符串,而我们正在使用比较运算符。

    我找到的解决办法是:

    1. 获取分区列表并有条件地过滤它们。
    2. 要么逐个删除各个分区,要么将它们作为[Map[String,String] (TablePartitionSpec) 的序列传递给目录的dropPartitions 函数。

    第 1 步:

    // Get External Catalog
    val catalog = spark.sharedState.externalCatalog
    
    // Get the spec from the list of partitions 
    val partitions = catalog.listPartitions("default", "potato").map(_.spec)
    
    // Filter according to the condition you attempted.
    val filteredPartitions = partitions.flatten.filter(_._2 <= "2020-06-07T03")
                                               .map(t => Map(t._1 -> t._2))
    

    第 2 步:

    我们将每个参数元组传递给单独的 ALTER TABLE DROP PARTITION 语句。

    filteredPartitions.flatten.foreach(t => 
         spark.sql(s"""ALTER TABLE potato DROP IF EXISTS PARTITION (${t._1}="${t._2}")"""))
    spark.table("potato").count // 6 records
    

    或者将它们传递给目录的dropPartition 函数。

    // If you purge data, it gets deleted immediately and isn't moved to trash.
    // This takes precedence over retainData, so even if you retainData but purge,
    // your data is gone.
    catalog.dropPartitions("default", "potato", filteredPartitions,
                           ignoreIfNotExists=true, purge=true, retainData=false)
    spark.table("potato").count // 6 records
    

    我希望这会有所帮助。如果您对 Spark 2.x 有更好的解决方案,请告诉我。

    【讨论】:

      【解决方案2】:

      您可以对 Spark 编程进行同样的操作。此外,它在 Spark 2 、 2.1 和 2.2 中也未修复,用于参考 https://issues.apache.org/jira/browse/SPARK-14922

          Steps 
      
              1 . Create hive context 
              2 . Get the table for getTable method from the hive context and you need to pass dbName, tableName and a boolean value if any error
              3 . From table Object hive.getPartitions(table) you can get the partitions from hive context (you need to decide which partitions you are going delete )
                  4 . You can remove partitions using dropPartition with partition values , table name and db info (hive.dropPartition) 
      
          hiveContext.getPartitions(table)
          hiveContext.dropPartition(dbName, tableName, partition.getValues(), true)
      
      
      You need to validate the partition name and check whether it needs to be deleted or not (you need to write custom method ).
      
             Or you can get the partition list sql using show partitions and from there also you can use drop partition to remove it.
      
      This may give you some pointers .
      

      【讨论】:

      • 嗨,在调用 getTable 后执行第 2 步我如何获取分区/执行 dropPartition?我找不到那些方法
      【解决方案3】:

      pyspark 人的解决方案

      1. 获取表的所有分区。

      2. 将分区列转换为分区列表。

      3. 清理分区以仅获取值。

      4. 具有所需条件的过滤列表。

      5. 对所有过滤列表执行 Alter table 操作。 请在下面找到相应的pyspark格式代码

         partitions = spark.sql("SHOW PARTITIONS potato")
         listpartitions = list(partitions.select('partition').toPandas()['partition'])
         cleanpartitions = [ i.split('=')[1] for i in listpartitions]
         filtered = [i for i in cleanpartitions if i < str(20180910)]
         for i in filtered:
             spark.sql("alter table potato DROP IF EXISTS PARTITION (date = '"+i+"')")
        

      【讨论】:

        【解决方案4】:

        我认为这里的问题是您使用 '&lt;' (lessthen) 符号如此不经意地您的数据必须是数字或日期类型的形式,但您将其放入 '' 意味着它采用字符串格式的值。我建议你必须检查分区的格式。可能您必须以正确的日期格式进行转换。

        猜你喜欢
        • 2016-01-21
        • 2017-06-10
        • 2021-03-02
        • 2017-07-22
        • 1970-01-01
        • 2018-04-08
        • 1970-01-01
        • 2020-01-07
        • 1970-01-01
        相关资源
        最近更新 更多