【问题标题】:How to read such a nested multiline json file into a data frame with Spark/Scala如何使用 Spark/Scala 将这样一个嵌套的多行 json 文件读入数据帧
【发布时间】:2021-03-12 00:18:12
【问题描述】:

我有以下 json:

{
    "value":[
            {"C1":"val1","C2":"val2"},
            {"C1":"val1","C2":"val2"},
            {"C1":"val1","C2":"val2"}
        ]
}

我想这样读:

spark.read
  .option("multiLine", true).option("mode", "PERMISSIVE")
  .json("/Projects.json")
  .show(10)

但它无法在数据框中正确显示我的记录,我如何绕过该“值”嵌套以正确地将我的行包含在数据框中?

当前结果:

我想要得到的结果是:

    C1   |   C2
-------------------
    VAL1 |   VAL2
    VAL1 |   VAL2
    ...etc

【问题讨论】:

  • 你希望它是什么样子的?
  • 我想要一个显示列的数据框:C1、C2 我在我的问题中添加了一个示例 :)
  • @mike 有什么想法吗? ^^ 我被卡住了
  • 我有时间仔细研究您的问题。猜猜使用 Spark 的 SQL 内置函数会更容易。

标签: scala apache-spark apache-spark-sql spark3


【解决方案1】:

查看 spark.read 返回的 Dataframe (jsonDf) 的架构:

jsonDf.printSchema()
root
 |-- value: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- C1: string (nullable = true)
 |    |    |-- C2: string (nullable = true)

你可以使用sql函数explode,然后选择C1C2这两个元素,如下图:

  val df = jsonDf
    .withColumn("parsedJson", explode(col("value")))
    .withColumn("C1", col("parsedJson.C1"))
    .withColumn("C2", col("parsedJson.C2"))
    .select(col("C1"), col("C2"))
    .show(false)

这会导致所需的结果:

+----+----+
|C1  |C2  |
+----+----+
|val1|val2|
|val1|val2|
|val1|val2|
+----+----+

【讨论】:

  • 我的列有一个字符串值,如 [{"C1":"val1","C2":"val2"},{"C1":"val1","C2":"val2" },{"C1":"val1","C2":"val2"}] 如何将其转换为上述格式?
【解决方案2】:

我终于设法使用以下函数找到了解决问题的方法:

  def flattenDataframe(df: DataFrame): DataFrame = {

    val fields = df.schema.fields
    val fieldNames = fields.map(x => x.name)
    val length = fields.length
    
    for(i <- 0 to fields.length-1){
      val field = fields(i)
      val fieldtype = field.dataType
      val fieldName = field.name
      fieldtype match {
        case arrayType: ArrayType =>
          val fieldNamesExcludingArray = fieldNames.filter(_!=fieldName)
          val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode_outer($fieldName) as $fieldName")
         // val fieldNamesToSelect = (fieldNamesExcludingArray ++ Array(s"$fieldName.*"))
          val explodedDf = df.selectExpr(fieldNamesAndExplode:_*)
          return flattenDataframe(explodedDf)
        case structType: StructType =>
          val childFieldnames = structType.fieldNames.map(childname => fieldName +"."+childname)
          val newfieldNames = fieldNames.filter(_!= fieldName) ++ childFieldnames
          val renamedcols = newfieldNames.map(x => (col(x.toString()).as(x.toString().replace(".", "_"))))
         val explodedf = df.select(renamedcols:_*)
          return flattenDataframe(explodedf)
        case _ =>
      }
    }
    df
  }

来源https://medium.com/@saikrishna_55717/flattening-nested-data-json-xml-using-apache-spark-75fa4c8ea2a7

【讨论】:

    【解决方案3】:

    使用inline 就可以了:

    val df = spark.read
      .option("multiLine", true).option("mode", "PERMISSIVE")
      .json("/Projects.json")
    
    val df2 = df.selectExpr("inline(value)")
    df2.show
    +----+----+
    |  C1|  C2|
    +----+----+
    |val1|val2|
    |val1|val2|
    |val1|val2|
    +----+----+
    

    【讨论】:

      猜你喜欢
      • 2023-03-25
      • 2020-08-24
      • 2021-05-10
      • 2021-07-02
      • 2019-04-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-03-12
      相关资源
      最近更新 更多