【问题标题】:pyspark with Hive, append will add to existing partition and duplicate data带有 Hive 的 pyspark,追加将添加到现有分区并重复数据
【发布时间】:2019-09-19 06:50:56
【问题描述】:

我目前正在使用 adwords api,我必须处理 1、7 和 30 天的数据。 因此,spark 作业是基本的,加载 csv 并将其写入带有分区的 parquet:

df.write
  .mode("append")
  .format("parquet")
  .partitionBy("customer_id", "date")
  .option("path", warehouse_location+"/"+table)
  .saveAsTable(table)

现在我面临的问题是 7 天和 30 天将在某个时间点(通过 1 天前)处理已处理的数据,因此在我的分区 table/customer_id/date/file.parquet 上,追加将附加第二个镶木地板文件分区。

但在这种特定情况下,我希望新的 parquet 文件覆盖以前的文件(因为 adwords csv 在生成的第一天和之后的 7 / 30 天之间会发生变化)。

我环顾四周,如果我尝试使用“覆盖”,它会覆盖整个表,而不仅仅是分区。

您对如何在此处进行操作有什么建议吗?

我不是 Spark 专家,现在我唯一的选择是拥有一个脚本,该脚本将根据文件时间戳清理该位置。但感觉这里不是正确的解决方案。

PS:我使用的是 Spark 2.4

【问题讨论】:

  • 什么版本的pyspark?
  • 我正在用 2.4 运行它,我会更新问题
  • 我遇到了同样的问题。在纸上,有一个 replace where 函数,它没有像描述的那样工作。解决方案是通过 sql 查询删除分区,然后追加分区。
  • @Christopher 感谢您的意见!这适用于 format('parquet') 吗?

标签: apache-spark hive pyspark


【解决方案1】:

基于SPARK-20236,你应该设置spark.sql.sources.partitionOverwriteMode="dynamic"属性,然后使用"overwrite"模式替换现有表中的单个分区。

【讨论】:

    猜你喜欢
    • 2022-01-26
    • 1970-01-01
    • 2017-09-13
    • 1970-01-01
    • 2015-06-12
    • 2021-06-22
    • 1970-01-01
    • 2017-03-09
    • 2021-11-05
    相关资源
    最近更新 更多