似乎暂时没有办法做到这一点。
如SPARK-14922所示,本次修复的目标版本为3.0.0,目前仍在进行中。
因此,我认为有两种可能的解决方法。
让我们使用 Spark 2.4.3 设置问题:
// We create the table
spark.sql("CREATE TABLE IF NOT EXISTS potato (size INT) PARTITIONED BY (hour STRING)")
// Enable dynamic partitioning
spark.conf.set("hive.exec.dynamic.partition.mode","nonstrict")
// Insert some dummy records
(1 to 9).map(i => spark.sql(s"INSERT INTO potato VALUES ($i, '2020-06-07T0$i')"))
// Verify inserts
spark.table("potato").count // 9 records
现在...尝试从 spark 内部删除单个分区!
spark.sql("""ALTER TABLE potato DROP IF EXISTS PARTITION (hour='2020-06-07T01')""")
spark.table("potato").count // 8 records
尝试删除多个分区不起作用。
spark.sql("""ALTER TABLE potato DROP IF EXISTS PARTITION (hour="2020-06-07T02", hour="2020-06-07T03")""")
org.apache.spark.sql.catalyst.parser.ParseException:
Found duplicate keys 'hour'.(line 1, pos 34)
== SQL ==
ALTER TABLE potato DROP IF EXISTS PARTITION (hour="2020-06-07T02", hour="2020-06-07T03")
----------------------------------^^^
使用比较运算符删除一系列分区也不起作用。
spark.sql("""ALTER TABLE potato DROP IF EXISTS PARTITION (hour<="2020-06-07T03")""")
org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input '<=' expecting {')', ','}(line 1, pos 49)
== SQL ==
ALTER TABLE potato DROP IF EXISTS PARTITION (hour<="2020-06-07T03")
-------------------------------------------------^^^
这应该是因为分区列是一个字符串,而我们正在使用比较运算符。
我找到的解决办法是:
- 获取分区列表并有条件地过滤它们。
- 要么逐个删除各个分区,要么将它们作为
[Map[String,String] (TablePartitionSpec) 的序列传递给目录的dropPartitions 函数。
第 1 步:
// Get External Catalog
val catalog = spark.sharedState.externalCatalog
// Get the spec from the list of partitions
val partitions = catalog.listPartitions("default", "potato").map(_.spec)
// Filter according to the condition you attempted.
val filteredPartitions = partitions.flatten.filter(_._2 <= "2020-06-07T03")
.map(t => Map(t._1 -> t._2))
第 2 步:
我们将每个参数元组传递给单独的 ALTER TABLE DROP PARTITION 语句。
filteredPartitions.flatten.foreach(t =>
spark.sql(s"""ALTER TABLE potato DROP IF EXISTS PARTITION (${t._1}="${t._2}")"""))
spark.table("potato").count // 6 records
或者将它们传递给目录的dropPartition 函数。
// If you purge data, it gets deleted immediately and isn't moved to trash.
// This takes precedence over retainData, so even if you retainData but purge,
// your data is gone.
catalog.dropPartitions("default", "potato", filteredPartitions,
ignoreIfNotExists=true, purge=true, retainData=false)
spark.table("potato").count // 6 records
我希望这会有所帮助。如果您对 Spark 2.x 有更好的解决方案,请告诉我。