【问题标题】:Spark Dataframe issue in overwriting the partition data of Hive table覆盖 Hive 表分区数据的 Spark Dataframe 问题
【发布时间】:2019-12-27 20:41:34
【问题描述】:

下面是我的 Hive 表定义:

CREATE EXTERNAL TABLE IF NOT EXISTS default.test2(
id integer,
count integer
)
PARTITIONED BY (
fac STRING,
fiscaldate_str DATE )
STORED AS PARQUET
LOCATION 's3://<bucket name>/backup/test2';

我在 hive 表中有如下数据,(我只是插入了示例数据)

select * from default.test2

+---+-----+----+--------------+
| id|count| fac|fiscaldate_str|
+---+-----+----+--------------+
|  2|    3| NRM|    2019-01-01|
|  1|    2| NRM|    2019-01-01|
|  2|    3| NRM|    2019-01-02|
|  1|    2| NRM|    2019-01-02|
|  2|    3| NRM|    2019-01-03|
|  1|    2| NRM|    2019-01-03|
|  2|    3|STST|    2019-01-01|
|  1|    2|STST|    2019-01-01|
|  2|    3|STST|    2019-01-02|
|  1|    2|STST|    2019-01-02|
|  2|    3|STST|    2019-01-03|
|  1|    2|STST|    2019-01-03|
+---+-----+----+--------------+

此表在两列(fac、fictiondate_str)上分区,我们正在尝试通过使用 spark 数据帧 - 数据帧编写器在分区级别动态执行插入覆盖。

但是,在尝试此操作时,我们要么得到重复数据,要么所有其他分区都被删除。

以下是使用 spark 数据帧的代码 sn-ps。

首先我将数据框创建为

df = spark.createDataFrame([(99,99,'NRM','2019-01-01'),(999,999,'NRM','2019-01-01')], ['id','count','fac','fiscaldate_str'])

df.show(2,False)
+---+-----+---+--------------+
|id |count|fac|fiscaldate_str|
+---+-----+---+--------------+
|99 |99   |NRM|2019-01-01    |
|999|999  |NRM|2019-01-01    |
+---+-----+---+--------------+
  1. 使用下面的 sn-p 得到重复,

    df.coalesce(1).write.mode("overwrite").insertInto("default.test2")

  2. 所有其他数据都被删除,只有新数据可用。

    df.coalesce(1).write.mode("overwrite").saveAsTable("default.test2")

   df.createOrReplaceTempView("tempview")

tbl_ald_kpiv_hist_insert = spark.sql("""
INSERT OVERWRITE TABLE default.test2 
partition(fac,fiscaldate_str) 
select * from tempview
""")

我将 AWS EMR 与 Spark 2.4.0 和 Hive 2.3.4-amzn-1 以及 S3 一起使用。

谁能知道为什么我不能将数据动态覆盖到分区中?

【问题讨论】:

  • 你可以尝试设置下面的属性。 spark.conf.set("spark.sql.sources.partitionOverwriteMode","d‌​ynamic")

标签: apache-spark hive pyspark apache-spark-sql partition


【解决方案1】:

您的问题不太容易理解,但我认为您的意思是要覆盖分区。如果是这样,那么这就是你所需要的,你所需要的——第二行:

df = spark.createDataFrame([(99,99,'AAA','2019-01-02'),(999,999,'BBB','2019-01-01')], ['id','count','fac','fiscaldate_str'])
df.coalesce(1).write.mode("overwrite").insertInto("test2",overwrite=True) 

注意覆盖=真。由于正在使用 DF.writer,因此所做的评论既不存在也不存在。我不是在解决合并问题(1)。

对提问者的评论

我在 Databricks Notebook 上按照标准运行 - 在此处进行原型设计和回答时 - 并明确设置以下内容,并且效果很好:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","static")
spark.conf.set("hive.exec.dynamic.partition.mode", "strict")

您要求更新答案:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","d‌​ynamic").

可以像我刚才那样做;可能在您的环境中这是需要的,但我当然不需要这样做。

20 年 19 月 3 日更新

这适用于之前的 Spark 版本,现在适用于以下应用:

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
// In Databricks did not matter the below settings
//spark.conf.set("hive.exec.dynamic.partition", "true")
//spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")

Seq(("CompanyA1", "A"), ("CompanyA2", "A"), 
    ("CompanyB1", "B"))
.toDF("company", "id")
.write
.mode(SaveMode.Overwrite)
.partitionBy("id")
.saveAsTable("KQCAMS9")

spark.sql(s"SELECT * FROM KQCAMS9").show(false)

val df = Seq(("CompanyA3", "A"))
.toDF("company", "id")
// disregard coalsece
df.coalesce(1).write.mode("overwrite").insertInto("KQCAMS9") 

spark.sql(s"SELECT * FROM KQCAMS9").show(false)
spark.sql(s"show partitions KQCAMS9").show(false)

现在从 2.4.x 开始一切正常。以后。

【讨论】:

  • 谢谢蓝幻。有效。我唯一需要在您的答案中添加更多内容的是属性 spark.conf.set("spark.sql.sources.partitionOverwriteMode","d‌​ynamic")。如果可能,请用它更新答案。
猜你喜欢
  • 1970-01-01
  • 2018-10-03
  • 1970-01-01
  • 2016-07-15
  • 2018-12-11
  • 2022-01-18
  • 1970-01-01
  • 1970-01-01
  • 2019-10-27
相关资源
最近更新 更多