【问题标题】:How to read and write DataFrame from Spark如何从 Spark 中读取和写入 DataFrame
【发布时间】:2017-04-03 21:35:23
【问题描述】:

我需要将 DataFrame 保存为 CSV 或 parquet 格式(作为单个文件),然后再次打开它。数据量不会超过60Mb,所以单个文件是合理的解决方案。这个简单的任务让我很头疼......这是我尝试过的:

读取文件(如果存在):

df = sqlContext
               .read.parquet("s3n://bucket/myTest.parquet")
               .toDF("key", "value", "date", "qty")

要写入文件:

df.write.parquet("s3n://bucket/myTest.parquet")

这不起作用,因为:

1) write 使用 hadoopish 文件创建文件夹 myTest.parquet,后来我无法使用 .read.parquet("s3n://bucket/myTest.parquet") 读取这些文件。事实上,我并不关心多个 hadoopish 文件,除非我以后可以轻松地将它们读入 DataFrame。有可能吗?

2) 我一直在使用我在 S3 中更新和覆盖的同一个文件 myTest.parquet。它告诉我该文件无法保存,因为它已经存在。

那么,有人可以告诉我进行读/写循环的正确方法吗?文件格式对我来说无关紧要(csv、parquet、csv、hadoopish 文件),除非我可以进行读写循环。

【问题讨论】:

标签: scala csv apache-spark parquet


【解决方案1】:

您可以使用saveAsTable("TableName") 保存您的DataFrame,并使用table("TableName") 读取它。并且可以通过spark.sql.warehouse.dir设置位置。您可以使用mode(SaveMode.Ignore) 覆盖文件。更多官方文档可以read这里。

在 Java 中它看起来像这样:

SparkSession spark = ...
spark.conf().set("spark.sql.warehouse.dir", "hdfs://localhost:9000/tables");
Dataset<Row> data = ...
data.write().mode(SaveMode.Overwrite).saveAsTable("TableName");

现在您可以通过以下方式读取数据:

spark.read().table("TableName");

【讨论】:

  • 可以请你举个例子吗?
  • 编辑了我的答案。抱歉,我的示例是用 Java 编写的,因为我在 Scala 方面不是很好。你使用 Spark 2.x.x 吗?据我所知,没有 SparkSession,但您应该可以使用 SparkContext 来完成。
  • 我使用 Spark 1.6.2,因为 Spark 2.0.0 目前不推荐用于生产。
猜你喜欢
  • 2018-05-23
  • 2019-04-20
  • 2019-09-20
  • 2020-06-12
  • 1970-01-01
  • 2020-03-16
  • 2018-12-03
  • 2021-03-11
  • 2017-08-21
相关资源
最近更新 更多