【问题标题】:parse json data with spark 2.3使用 spark 2.3 解析 json 数据
【发布时间】:2021-11-03 09:54:30
【问题描述】:

我有以下 json 数据:

{
  "3200": {
    "id": "3200",
    "value": [
      "cat",
      "dog"
    ]
  },
  "2000": {
    "id": "2000",
    "value": [
      "bird"
    ]
  },
  "2500": {
    "id": "2500",
    "value": [
      "kitty"
    ]
  },
  "3650": {
     "id": "3650",
      "value": [
      "horse"
    ]
  }
}

这个数据的schema,我们用spark加载数据后的printSchema工具如下:

    root
 |-- 3200: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- value: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- 2000: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- value: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- 2500: struct (nullable = true)
 |     |-- id: string (nullable = true)
 |    |-- value: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- 3650: struct (nullable = true)
 |   |-- id: string (nullable = true)
 |    |-- value: array (nullable = true)
 |    |    |-- element: string (containsNull = true)

我想得到以下数据框

id    value

 3200  cat
 2000  bird
 2500  kitty
 3200  dog
 3650  horse 

如何进行解析以获得预期的输出

【问题讨论】:

    标签: json apache-spark apache-spark-sql


    【解决方案1】:

    使用 spark-sql

    数据框步骤(与 Mohana 的回答相同)

    val df = spark.read.json(Seq(jsonData).toDS())
    

    构建临时视图

    df.createOrReplaceTempView("df")
    

    结果:

    val cols_k = df.columns.map( x => s"`${x}`.id" ).mkString(",")
    val cols_v = df.columns.map( x => s"`${x}`.value" ).mkString(",")
    spark.sql(s""" 
    with t1 ( select map_from_arrays(array(${cols_k}),array(${cols_v})) s from df ),
         t2 ( select explode(s) (key,value) from t1 )
         select key, explode(value) value from t2
    
    """).show(false)
    
    +----+-----+
    |key |value|
    +----+-----+
    |2000|bird |
    |2500|kitty|
    |3200|cat  |
    |3200|dog  |
    |3650|horse|
    +----+-----+
    

    【讨论】:

    • 你能用scala或者python写吗
    【解决方案2】:

    您可以使用stack() 函数转置数据帧,然后提取key 字段并使用explode_outer 函数分解value 字段。

    val spark = SparkSession.builder().master("local[*]").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    
    import spark.implicits._
    
    val jsonData = """{
                       |  "3200": {
                       |    "id": "3200",
                       |    "value": [
                       |      "cat",
                       |      "dog"
                       |    ]
                       |  },
                       |  "2000": {
                       |    "id": "2000",
                       |    "value": [
                       |      "bird"
                       |    ]
                       |  },
                       |  "2500": {
                       |    "id": "2500",
                       |    "value": [
                       |      "kitty"
                       |    ]
                       |  },
                       |  "3650": {
                       |     "id": "3650",
                       |      "value": [
                       |      "horse"
                       |    ]
                       |  }
                       |}
                       |""".stripMargin
    
    val df = spark.read.json(Seq(jsonData).toDS())
    
    df.selectExpr("stack (4, *) key")
        .select(expr("key.id").as("key"),
          explode_outer(expr("key.value")).as("value"))
        .show(false)
    
    +----+-----+
    |key |value|
    +----+-----+
    |2000|bird |
    |2500|kitty|
    |3200|cat  |
    |3200|dog  |
    |3650|horse|
    +----+-----+
    

    【讨论】:

    • 我们可以有一个替代方案,而不用硬编码 4 号
    • 使用df.columns.length
    猜你喜欢
    • 2021-09-24
    • 2017-07-23
    • 1970-01-01
    • 1970-01-01
    • 2020-01-18
    • 2017-05-23
    • 2020-01-19
    • 1970-01-01
    • 2012-11-10
    相关资源
    最近更新 更多