【问题标题】:Spark DataFrame Read And WriteSpark DataFrame 读写
【发布时间】:2019-04-20 22:57:56
【问题描述】:

我有一个用例,我必须将数百万个 json 格式的数据加载到 Apache Hive 表中。 所以我的解决方案很简单,将它们加载到数据框中并将它们写入 Parquet 文件。 然后我将在它们上创建一个外部表。

我正在使用 Apache Spark 2.1.0 和 scala 2.11.8。

所有消息都遵循一种灵活的模式。 例如,“金额”列的值可以是 1.0 或 1。

由于我正在将数据从半结构化格式转换为结构化格式,但我的架构稍微有点 变量,我已经通过考虑像 json 这样的数据源的 inferSchema 选项来补偿我。

spark.read.option("inferSchema","true").json(RDD[String])

当我在读取 json 数据时将 inferSchema 设为 true 时,

案例1:对于较小的数据,所有parquet文件的数量都是双倍的。

案例 2:对于较大的数据,一些 parquet 文件的数量为 double,而其他文件的数量为 int64。

我尝试调试并发现某些概念,例如架构演变和架构合并 超越了我的头脑,让我的疑问多于答案。

我的疑问/问题是

  1. 当我尝试推断架构时,它不会将推断的架构强制执行到完整数据集吗?

  2. 由于我的限制,我无法强制执行任何架构,我想将整个 列到 double 数据类型,因为它可以同时具有整数和十进制数。 有没有更简单的方法?

  3. 我的猜测是,由于数据是分区的,所以 inferSchema 每个分区工作,然后 它给了我一个通用的模式,但它没有做任何像强制模式或任何事情 这种类型的。如果我错了,请纠正我。

注意:我使用 inferSchema 选项的原因是因为传入的数据过于灵活/可变 强制执行我自己的案例类,尽管某些列是强制性的。如果您有更简单的解决方案,请提出建议。

【问题讨论】:

    标签: scala apache-spark hive


    【解决方案1】:

    推断模式实际上只是处理所有行以查找类型。 一旦这样做,它就会合并结果以找到整个数据集共有的模式。

    例如,您的某些字段可能在某些行中具有值,但在其他行中没有。因此,该字段的推断模式将变为可为空。

    要回答您的问题,可以为您的输入推断架构。 但是,由于您打算在 Hive 中使用输出,因此应确保所有输出文件具有相同的架构。

    一个简单的方法是使用强制转换(如您所建议的那样)。我通常喜欢在工作的最后阶段进行选择,然后只列出所有的列和类型。我觉得这使这项工作更具人类可读性。

    例如

    df
    .coalesce(numOutputFiles)
    .select(
      $"col1"        .cast(IntegerType).as("col1"),
      $"col2"        .cast( StringType).as("col2"),
      $"someOtherCol".cast(IntegerType).as("col3")
    )
    .write.parquet(outPath)
    

    【讨论】:

    • 因此,inferSchema 不对数据强制执行模式。它读取所有行并只给我一个合并的模式??包含不同数据类型的同一列可以吗??
    • 只要列是正确的类型就可以了。例如Long 也可以保存整数值。 Double 可以容纳 Float 和 Integer。
    • 谢谢。我还有一个疑问,说其中一行的“金额”作为字符串,它是与 double 完全不同的类型,那么行为是什么?
    • 即使火花很好,同一列具有不同数据类型的数据,我怀疑 hive 和 impala 是否可以接受?当我尝试从 impala 中的 中选择 * 时,它给我带来了列类型不兼容的错误。 spark sql不应该遵循相同的标准吗??
    猜你喜欢
    • 2018-12-03
    • 2017-04-03
    • 2018-05-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-03-16
    • 2021-04-05
    • 1970-01-01
    相关资源
    最近更新 更多