【问题标题】:How to write one Json file for each row from the dataframe in Scala/Spark and rename the files如何在 Scala/Spark 中为数据框中的每一行编写一个 Json 文件并重命名文件
【发布时间】:2019-02-07 21:24:15
【问题描述】:

需要为数据框中的每一行创建一个 json 文件。我正在使用 PartitionBy 为每个文件创建子文件夹。有没有办法避免创建子文件夹并使用唯一键重命名 json 文件? 或任何其他选择?它是一个巨大的数据框,包含数千个 (~300K) 的唯一值,因此 Repartition 占用了大量资源并花费时间。谢谢。

df.select(Seq(col("UniqueField").as("UniqueField_Copy")) ++ 
df.columns.map(col): _*)       
.write.partitionBy("UniqueField")
.mode("overwrite").format("json").save("c:\temp\json\")

【问题讨论】:

    标签: json scala apache-spark


    【解决方案1】:

    将所有输出放在一个目录中

    您的示例代码在 DataFrameWriter 对象上调用 partitionBy。文档告诉我们这个函数:

    按文件系统上的给定列对输出进行分区。如果指定,则输出布局在文件系统上,类似于 Hive 的分区方案。例如,当我们按年和月对数据集进行分区时,目录布局如下所示:

    年=2016/月=01/

    年=2016/月=02/

    这就是您获得子目录的原因。只需删除对partitionBy 的调用即可将所有输出放在一个目录中。

    每个文件获取一行

    Spark SQL

    您有一个正确的想法,即按UniqueField 对数据进行分区,因为 Spark 每个分区写入一个文件。而不是使用DataFrameWriter的分区,你可以使用

    df.repartitionByRange(numberOfJson, $"UniqueField")
    

    获取所需数量的分区,每个分区一个 JSON。请注意,这需要您提前知道最终将获得的 JSON 数量。你可以计算它

    val numberOfJson = df.select(count($"UniqueField")).first.getAs[Long](0)
    

    但是,这会为您的查询添加一个额外的操作,这将导致您的 整个 数据集再次被计算。听起来您的数据集太大而无法放入内存,因此您需要仔细考虑使用df.cache(或df.checkpoint)进行缓存(或检查点)是否真的可以节省计算时间。 (对于不需要大量计算来创建的大型数据集,重新计算实际上可以更快)

    RDD

    使用 Spark SQL API 的替代方法是下拉到较低级别的 RDD。在this question 的回答中彻底讨论了 RDD 的按键分区(在 pyspark 中)。在 scala 中,您必须指定自定义 Partitioner,如 this question 中所述。

    重命名 Spark 的输出文件

    This 是一个相当普遍的问题,而 AFAIK 的共识是这是不可能的。

    希望对您有所帮助,欢迎使用 Stack Overflow!

    【讨论】:

      猜你喜欢
      • 2018-03-30
      • 2017-12-24
      • 2018-03-24
      • 1970-01-01
      • 1970-01-01
      • 2020-10-06
      • 1970-01-01
      • 1970-01-01
      • 2018-09-18
      相关资源
      最近更新 更多