【问题标题】:Export Spark dataframe as JSon array with custom metadata将 Spark 数据帧导出为带有自定义元数据的 JSON 数组
【发布时间】:2020-07-01 19:56:06
【问题描述】:

我在我的 MongoDB 中存储了一些 JSON 文档。每个文档看起来像:{"businessData":{"capacity":{"fuelCapacity":282}, ..}

阅读完所有文档后,我想将它们导出为有效的 JSON 文件。具体来说:

// Read JSON data from the DB
val df: DataFrame = MongoSpark.load(sparkSession, readConfig)
df.show
// Export into the file system
df.coalesce(1).write.mode(SaveMode.Overwrite).json("export.json")
// The show command only shows the .json values
+--------------------+
|        businessData|
+--------------------+
|[[282],0,[true,20...|
|[[280],0,[true,20...|
|[[290],0,[true,20...|
|[[292],0,[true,20...|
|[[282],16,[true,2...|
+--------------------+

// export.json
{"businessData":{"capacity":{"fuelCapacity":282}, ..}
{"businessData":{"capacity":{"fuelCapacity":280}, ..}
{"businessData":{"capacity":{"fuelCapacity":290}, ..}
{"businessData":{"capacity":{"fuelCapacity":292}, ..}
{"businessData":{"capacity":{"fuelCapacity":282}, ..}

但是当我导出到文件系统时,我想将这 5 行组合成一个数组并添加一些自定义元数据。例如:

{
  "metadata" : { "exportTime": "20/20/2020" , ...} 
  "allBusinessData" : [
    {"businessData":{"capacity":{"fuelCapacity":282}, ..},
    // all 5 rows from above
  ]
}

我已经看到问题 herehere 反对它。他们还部分回答了这个问题,因为不要向导出添加自定义 json 结构。

但是,假设这是我可以继续的唯一方法,我该怎么做?

非常感谢!

【问题讨论】:

    标签: json mongodb scala apache-spark


    【解决方案1】:

    来自 Spark-2.2+:

    您可以尝试使用to_json(或)在 spark 中创建 struct<array<...etc>> 字段,然后以 json 格式编写 df 以获得所需的输出。

    • 对于示例数据,我假设 exporttime 为 current_timestamp()

    Example:

    val df=spark.read.json(Seq("""[{"businessData":{"capacity":{"fuelCapacity":282}}},{"businessData":{"capacity":{"fuelCapacity":456}}}""").toDS)
    
    //creating a struct field called metadata and write data in json format.
    df.selectExpr("struct(current_timestamp() as exporttime,struct(collect_list(businessData) as businessData)as allBusinessData) as metadata").write.format("json").mode("overwrite").save("json_path")
    
    //using .to_json to create json object in dataframe
    df.selectExpr("to_json(struct(current_timestamp() as exporttime,struct(collect_list(businessData) as businessData)as allBusinessData))metadata").show(false)
    
    //+-------------------------------------------------------------------------------------------------------------------------------------------------------+
    //|metadata                                                                                                                                               |
    //+-------------------------------------------------------------------------------------------------------------------------------------------------------+
    //|{"exporttime":"2020-03-21T15:17:54.769-05:00","allBusinessData":{"businessData":[{"capacity":{"fuelCapacity":282}},{"capacity":{"fuelCapacity":456}}]}}|
    //+-------------------------------------------------------------------------------------------------------------------------------------------------------+
    
    //using  .toJSON to view json in shell(non-prod use only)
    df.selectExpr("struct(current_timestamp() as exporttime,struct(collect_list(businessData) as businessData)as allBusinessData)metadata").toJSON.collect()
    
    //Array[String] = Array({"metadata":{"exporttime":"2020-03-21T15:19:35.890-05:00","allBusinessData":{"businessData":[{"capacity":{"fuelCapacity":282}},{"capacity":{"fuelCapacity":456}}]}}})
    

    【讨论】:

    • 您好,我必须使用 MongoSpark 驱动程序 (MongoSpark.load) 阅读,它没有 .toDS 方法。我仍然尝试了您的解决方案,它几乎回答了我的问题。我得到:"allBusinessData":{而不是所需的"allBusinessData":[{...(注意左方括号[
    • @user1485864,你不必担心.toDS...对于我使用.toDS读取为.json。对于您的情况,您的 df 是 json 格式。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-04-19
    • 1970-01-01
    • 2023-04-11
    • 2016-10-09
    • 1970-01-01
    • 2023-03-31
    • 1970-01-01
    相关资源
    最近更新 更多