【问题标题】:How to delete a particular month from a parquet file partitioned by month如何从按月分区的镶木地板文件中删除特定月份
【发布时间】:2019-12-22 12:26:06
【问题描述】:

我有过去 5 年的 monthly 收入数据,并且我在 append 模式下以 parquet 格式存储各个月份的数据帧,但 partitioned by month 列。这是下面的伪代码 -

def Revenue(filename):
    df = spark.read.load(filename)
    .
    .
    df.write.format('parquet').mode('append').partitionBy('month').save('/path/Revenue')

Revenue('Revenue_201501.csv')
Revenue('Revenue_201502.csv')
Revenue('Revenue_201503.csv')
Revenue('Revenue_201504.csv')
Revenue('Revenue_201505.csv')

df 每月以parquet 格式存储,如下所示 -

问题:如何删除特定月份对应的parquet文件夹?

一种方法是将所有这些parquet 文件加载到一个大的df 中,然后使用.where() 子句过滤掉该特定月份,然后将其保存回parquet 格式partitionBy 月份@ 987654341@模式,像这样-

# If we want to remove data from Feb, 2015
df = spark.read.format('parquet').load('Revenue.parquet')
df = df.where(col('month') != lit('2015-02-01'))
df.write.format('parquet').mode('overwrite').partitionBy('month').save('/path/Revenue')

但是,这种方法相当麻烦。

另一种方法是直接删除该特定月份的文件夹,但我不确定这是否是正确的处理方式,以免我们以不可预见的方式更改metadata

删除特定月份的parquet 数据的正确方法是什么?

【问题讨论】:

  • 如果您稍后选择,这里有一个很好的讨论链接,但这不是您原始问题的答案。贴出来仅供参考。 stackoverflow.com/questions/38318513/…
  • @vikrantrana 非常感谢 Vikrant 向我推荐该链接。让我试着理解它。
  • 请看下面的答案。它可以作为指向您原始问题的指针。您必须根据镶木地板格式或分区列进行少量更改。如果您找到使用 spark 函数的方法,也请告诉我。
  • 看来这个问题已经争论了很久了。不太确定,但可能是。 stackoverflow.com/questions/48090352/…

标签: python apache-spark pyspark parquet


【解决方案1】:

Spark 支持删除分区,包括数据和元数据。
引用scala代码注释

/**
 * Drop Partition in ALTER TABLE: to drop a particular partition for a table.
 *
 * This removes the data and metadata for this partition.
 * The data is actually moved to the .Trash/Current directory if Trash is configured,
 * unless 'purge' is true, but the metadata is completely lost.
 * An error message will be issued if the partition does not exist, unless 'ifExists' is true.
 * Note: purge is always false when the target is a view.
 *
 * The syntax of this command is:
 * {{{
 *   ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
 * }}}
 */

在您的情况下,没有后备表。 我们可以将数据帧注册为临时表并使用上述语法(temp table documentation)

通过 pyspark,我们可以使用link 中的语法运行 SQL 示例:

df = spark.read.format('parquet').load('Revenue.parquet'). registerTempTable("tmp")
spark.sql("ALTER TABLE tmp DROP IF EXISTS PARTITION (month='2015-02-01') PURGE")

【讨论】:

  • 你需要做第一部分吗? df = ...我倾向于在脚本中这样做。如果是这样的话很有趣。
  • 我们可以像spark.apache.org/docs/latest/…中提到的那样直接写SQL。我不确定ALTER TABLE 是否符合链接中提到的语法。
  • @DaRkMaN 嗨!从您的回答中得到提示,我发现带有PURGE 选项的Alter TABLE 只会在表为internal table 时删除HDFS 上的数据,而不是在external table 时。我的是external 一个。如何从 HDFS 中删除相应的数据?
  • 正如您在评论中提到的那样,此ALTER TABLE... 代码不适合 PySpark 框架。可能它适用于 HIVE,但我必须在 PySpark 中进行。感谢您的努力。
【解决方案2】:

以下语句只会删除与分区信息相关的元数据。

ALTER TABLE db.yourtable DROP IF EXISTS PARTITION(loaded_date="2019-08-22");

如果您还想删除数据,则需要将 hive 外部表的 tblproperties 设置为 False。它会将您的配置单元表设置为托管表。

alter table db.yourtable set TBLPROPERTIES('EXTERNAL'='FALSE');

您可以将其设置回外部表。

alter table db.yourtable set TBLPROPERTIES('EXTERNAL'='TRUE');

我尝试使用 spark session 设置给定属性,但遇到了一些问题。

 spark.sql("""alter table db.test_external set tblproperties ("EXTERNAL"="TRUE")""")
pyspark.sql.utils.AnalysisException: u"Cannot set or change the preserved property key: 'EXTERNAL';"

我确信一定有办法做到这一点。我最终使用了python。我在 pyspark 中定义了下面的函数,它完成了这项工作。

query=""" hive -e 'alter table db.yourtable set tblproperties ("EXTERNAL"="FALSE");ALTER TABLE db.yourtable DROP IF EXISTS PARTITION(loaded_date="2019-08-22");' """

def delete_partition():
        print("I am here")
        import subprocess
        import sys
        p=subprocess.Popen(query,shell=True,stderr=subprocess.PIPE)
        stdout,stderr = p.communicate()
        if p.returncode != 0:
            print stderr
            sys.exit(1) 

>>> delete_partition()

这将同时删除元数据和数据。 笔记。我已经使用 Hive ORC 外​​部分区表对此进行了测试,该表在加载日期进行分区

# Partition Information
# col_name              data_type               comment

loaded_date             string

更新: 基本上,您的数据位于名为

的子目录中的 hdfs 位置
/Revenue/month=2015-02-01
/Revenue/month=2015-03-01
/Revenue/month=2015-03-01

等等

def delete_partition(month_delete):
      print("I am here")
      hdfs_path="/some_hdfs_location/Revenue/month="
      final_path=hdfs_path+month_delete
      import subprocess
      subprocess.call(["hadoop", "fs", "-rm", "-r", final_path])
      print("got deleted")

delete_partition("2015-02-01")

【讨论】:

  • 嗯,周一试试,然后通知你。
  • 好吧,我试着调查了一下,但是由于我在HIVE中没有代码,而是直接在PySparkJupyter,所以我不知道db会是什么我的情况。
  • 哦,好吧..您将数据帧直接保存到某个 hdfs 位置。我会检查这个。谢谢
  • 嗨,是的,你是完全正确的。我在 HDFS 上将df 保存为parquet 格式,由month 分区,如问题所示。我正在直接从那里加载我的df
  • 快乐维克兰特 :)
猜你喜欢
  • 2020-12-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-06-22
  • 2017-11-11
  • 2019-01-24
  • 2021-06-13
  • 2019-10-27
相关资源
最近更新 更多