【问题标题】:Spark dataframe checkpoint cleanupSpark 数据帧检查点清理
【发布时间】:2020-02-02 02:00:06
【问题描述】:

我在 spark 中有一个数据帧,其中已加载来自 Hive 的整个分区,我需要在对数据进行一些修改后打破沿袭以覆盖相同的分区。但是,当火花作业完成时,我留下了来自 HDFS 上检查点的数据。为什么 Spark 不自行清理它或者我缺少什么?

spark.sparkContext.setCheckpointDir("/home/user/checkpoint/")
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

val df = spark.table("db.my_table").filter(col("partition").equal(2))

// ... transformations to the dataframe

val checkpointDf = df.checkpoint()
checkpointDf.write.format("parquet").mode(SaveMode.Overwrite).insertInto("db.my_table")

在此之后,我在 HDFS 上有了这个文件:

/home/user/checkpoint/214797f2-ce2e-4962-973d-8f215e5d5dd8/rdd-23/part-00000

每次我运行 spark 作业时,我都会得到一个新目录,其中包含一个新的唯一 ID,其中包含数据帧中每个 RDD 的文件。

【问题讨论】:

标签: scala apache-spark hive


【解决方案1】:

Spark 具有用于检查点文件清理的隐式机制。

在 spark-defaults.conf 中添加此属性。

spark.cleaner.referenceTracking.cleanCheckpoints  true #Default is false

您可以在 Spark official configuration page中找到更多关于 Spark 配置的信息

如果您想从 HDFS 中删除检查点目录,可以使用 Python 将其删除,在脚本末尾您可以使用此命令 rmtree

此属性spark.cleaner.referenceTracking.cleanCheckpointstrue 一样,允许清理程序删除检查点目录中的旧检查点文件。

【讨论】:

  • 嗨,我尝试将它添加到我的 spark2-shell 命令中,我可以看到它在 spark 历史服务器中设置,但是当我运行我的代码并使用“sys.exit”关闭 shell 时检查点文件夹仍在 HDFS 上。不能为我提交的单个 Spark 作业指定吗?
  • @aweis 检查点永远不会从 HDFS 中删除。如果要从 HDFS 中删除检查点目录,可以使用 Python 将其删除,在脚本末尾可以使用此命令 rmtree。如果spark.cleaner.referenceTracking.cleanCheckpoints 为真,Spark 将删除检查点目录中旧的检查点文件。
  • 感谢您的建议,但是手动删除检查点位置有点违背目的,因为我需要为每个检查点 DF 创建唯一文件夹以确保没有竞争条件(就像 spark 在检查点位置)。必须有一种配置 spark 的方法,以便在完成后清理任何数据溢出 - 这可能有许多 GB 留在 HDFS 上
  • @aweis 你现在别无选择,要么使用spark.cleaner.referenceTracking.cleanCheckpoints,它允许 Spark 在旧数据中应用保留策略,但你无法控制它或手动清理过程结束时的目录,因此您将确保不会在 HDFS 中留下任何文件。我是你的情况,因为据我了解它不是流应用程序,我建议手动清理文件,这很简单,你可以控制。另一方面,如果您有流式应用程序,最好让 Spark 处理检查点文件
猜你喜欢
  • 1970-01-01
  • 2018-06-22
  • 2017-12-23
  • 1970-01-01
  • 2016-01-30
  • 1970-01-01
  • 2017-01-24
  • 2017-05-07
  • 2017-02-10
相关资源
最近更新 更多