【问题标题】:EMR Spark step to append to parquet files is overwriting parquet files附加到镶木地板文件的 EMR Spark 步骤正在覆盖镶木地板文件
【发布时间】:2019-11-20 02:03:06
【问题描述】:

使用 Python 3.6 在 Amazon EMR 集群(1 个主节点,2 个节点)上运行 Spark 2.4.2

我正在 Amazon s3 中读取对象,以 parquet 格式压缩它们,并将它们添加(附加)到现有的 parquet 数据存储中。当我在 pyspark shell 中运行我的代码时,我能够读取/压缩对象并将新的 parquet 文件添加到现有 parquet 文件中,当我对 parquet 数据运行查询时,它显示所有数据都在镶木地板文件夹。但是,当我在 EMR 集群上的某个步骤中运行代码时,现有 parquet 文件会被新文件覆盖。相同的查询会显示只有新数据存在,而 parquet 数据所在的 s3 文件夹也只有新数据。

这是步骤的关键代码:

    spark = SparkSession.builder \
                        .appName("myApp") \
                        .getOrCreate()

    df_p = spark.read \
                .format('parquet') \
                .load(parquet_folder)

    the_schema = df_p.schema

    df2 = spark.read \
               .format('com.databricks.spark.xml') \
               .options(rowTag='ApplicationSubmission', \
                        path=input_folder) \
               .schema(the_schema) \
               .load(input_folder+'/*.xml')

    df2.coalesce(10) \
       .write \
       .option('compression', 'snappy') \
       .option('path', parquet_folder) \
       .format('parquet') \
       .mode('append') \
       .saveAsTable(table_name, mode='append')

我希望这会将input_folder 中的数据附加到parquet_folder 中的现有数据中,但在EMR 步骤中执行时会被覆盖。我试过在.saveAsTable 中没有mode='append'(在pyspark shell 中没有必要)。

建议?

【问题讨论】:

  • 你试过用.parquet(path)而不是.saveAsTable保存吗?
  • @kadu 太好了! .parquet(parquet_folder) 似乎有效!你知道为什么我的代码可以在 pyspark shell 中运行,但不能一步到位吗?

标签: python apache-spark amazon-emr parquet


【解决方案1】:

我不知道为什么你的方法不起作用,但我使用 .parquet(path) 而不是 .saveAsTable(...) 获得了更好的结果。我不知道这种行为的原因,但我之前没有看到 saveAsTable 用于保存数据对象,因为它在 Hive 元存储中创建了一个表(这不是“物理”数据对象)。

如果您的步骤通过 Apache Livy 运行,它们的行为可能与在 shell 上的行为不同。如果您确实在使用 Livy,您可以在 Zeppelin 笔记本上测试您的代码,在您的代码单元上指出您应该使用 %livy-pyspark 执行程序运行它们。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-12-22
    • 1970-01-01
    • 2019-06-02
    • 2016-07-04
    • 2019-02-05
    • 2022-11-11
    • 1970-01-01
    • 2019-09-23
    相关资源
    最近更新 更多