【发布时间】:2019-11-20 02:03:06
【问题描述】:
使用 Python 3.6 在 Amazon EMR 集群(1 个主节点,2 个节点)上运行 Spark 2.4.2
我正在 Amazon s3 中读取对象,以 parquet 格式压缩它们,并将它们添加(附加)到现有的 parquet 数据存储中。当我在 pyspark shell 中运行我的代码时,我能够读取/压缩对象并将新的 parquet 文件添加到现有 parquet 文件中,当我对 parquet 数据运行查询时,它显示所有数据都在镶木地板文件夹。但是,当我在 EMR 集群上的某个步骤中运行代码时,现有 parquet 文件会被新文件覆盖。相同的查询会显示只有新数据存在,而 parquet 数据所在的 s3 文件夹也只有新数据。
这是步骤的关键代码:
spark = SparkSession.builder \
.appName("myApp") \
.getOrCreate()
df_p = spark.read \
.format('parquet') \
.load(parquet_folder)
the_schema = df_p.schema
df2 = spark.read \
.format('com.databricks.spark.xml') \
.options(rowTag='ApplicationSubmission', \
path=input_folder) \
.schema(the_schema) \
.load(input_folder+'/*.xml')
df2.coalesce(10) \
.write \
.option('compression', 'snappy') \
.option('path', parquet_folder) \
.format('parquet') \
.mode('append') \
.saveAsTable(table_name, mode='append')
我希望这会将input_folder 中的数据附加到parquet_folder 中的现有数据中,但在EMR 步骤中执行时会被覆盖。我试过在.saveAsTable 中没有mode='append'(在pyspark shell 中没有必要)。
建议?
【问题讨论】:
-
你试过用
.parquet(path)而不是.saveAsTable保存吗? -
@kadu 太好了!
.parquet(parquet_folder)似乎有效!你知道为什么我的代码可以在 pyspark shell 中运行,但不能一步到位吗?
标签: python apache-spark amazon-emr parquet