【问题标题】:Corrupt rows written to __HIVE_DEFAULT_PARTITION__ when attempting to overwrite Hive partition尝试覆盖 Hive 分区时写入 __HIVE_DEFAULT_PARTITION__ 的损坏行
【发布时间】:2019-05-28 02:55:57
【问题描述】:

我在尝试使用 Spark 2.3 覆盖 Hive 表中的分区时看到一些非常奇怪的行为

首先,我在构建 SparkSession 时设置了以下设置:

.config("spark.sql.sources.partitionOverwriteMode", "dynamic")

然后我将一些数据复制到新表中并按 date_id 列进行分区。

ds
  .write
  .format("parquet")
  .option("compression", "snappy")
  .option("auto.purge", "true")
  .mode(saveMode)
  .partitionBy("date_id")
  .saveAsTable("tbl_copy")

在 HDFS 中可以看到相关的 date_id 目录已经创建完毕。

然后,我创建一个包含我希望覆盖的分区的数据的 DataSet,其中包含单个 date_id 的数据,并按如下方式插入 Hive:

  ds
    .write
    .mode(SaveMode.Overwrite)
    .insertInto("tbl_copy")

作为健全性检查,我将相同的数据集写入新表。

      ds
        .write
        .format("parquet")
        .option("compression", "snappy")
        .option("auto.purge", "true")
        .mode(SaveMode.Overwrite)
        .saveAsTable("tmp_tbl")

tmp_tbl 中的数据完全符合预期。

但是,当我查看 tbl_copy 时,我看到了一个新的 HDFS 目录 `date_id=HIVE_DEFAULT_PARTITION

查询 tbl_cpy

SELECT * from tbl_copy WHERE date_id IS NULL

我看到本应插入分区 date_id=20180523 的行,但 date_id 列为空,并且已用值 20180523 填充了不相关的 row_changed 列。

似乎插入 Hive 以某种方式导致我的数据被破坏。将相同的数据集写入新表不会导致任何问题。

有人能解释一下吗?

【问题讨论】:

  • 您有架构更改吗?
  • Dataset 的架构完全相同,Dataset 正在正确写入新表。我在过去联合两个数据集时遇到过问题,这些数据集由于列的排序而出现问题。我已将我正在写入的数据集中的列重新排序为与被覆盖的表相同,但问题仍然存在。

标签: apache-spark hive apache-spark-sql


【解决方案1】:

所以看来分区列必须是数据集中的最后一个。

我通过将以下方法添加到 Dataset[T] 上解决了这个问题。

def partitionsTail(partitionColumns: Seq[String]) = {
  val columns = dataset.schema.collect{ case s if !partitionColumns.contains(s.name) => s.name} ++ partitionColumns

  dataset.select(columns.head, columns.tail: _*).as[T]
} 

【讨论】:

  • 您能详细说明一下吗?不确定为什么会这样。
【解决方案2】:

是的,这是一个棘手的行为,请在文档中解释:

https://spark.apache.org/docs/2.1.2/api/java/org/apache/spark/sql/DataFrameWriter.html#insertInto(java.lang.String)

与 saveAsTable 不同,insertInto 忽略列名并仅使用 基于位置的分辨率。例如:

    scala> Seq((1, 2)).toDF("i", "j").write.mode("overwrite").saveAsTable("t1")
    scala> Seq((3, 4)).toDF("j", "i").write.insertInto("t1")
    scala> Seq((5, 6)).toDF("a", "b").write.insertInto("t1")
    scala> sql("select * from t1").show
    +---+---+
    |  i|  j|
    +---+---+
    |  5|  6|
    |  3|  4|
    |  1|  2|
    +---+---+

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-06-30
    • 2017-03-01
    • 1970-01-01
    • 2018-10-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-01-10
    相关资源
    最近更新 更多