【问题标题】:How to extract values from JSON-encoded column? [duplicate]如何从 JSON 编码的列中提取值? [复制]
【发布时间】:2018-12-15 11:36:15
【问题描述】:

我有一个具有以下架构的 Spark Dataframe。

[{ "map": {
    "completed-stages": 1,
    "total-stages": 1 },
    "rec": "test-plan",
    "status": {
        "state": "SUCCESS"
    }
  },
  { "map": {
    "completed-stages": 1,
    "total-stages": 1 },
    "rec": "test-proc",
    "status": {
        "state": "FAILED"
  }
}]

我想将其转换为具有以下架构的另一个 DF [{"rec": "test-plan", "status": "SUCCESS"}, {"rec": "test-pROC", "status": "FAILED"}]

我编写了以下代码,但它无法编译并抱怨编码错误。

val fdf = DF.map(f => {
        val listCommands = f.get(0).asInstanceOf[WrappedArray[Map[String, Any]]]
        val m = listCommands.map(h => {
            var rec = "none"
            var status = "none"

            if(h.exists("status" == "state" -> _)) {
                status = (h.get("status") match {
                    case Some(x) => x.asInstanceOf[HashMap[String, String]].getOrElse("state", "none")
                    case _ => "none"
                })

                if(h.contains("rec")) {
                    rec = (h.get("rec") match {
                        case Some(x: String) => x
                        case _ => "none"
                    })
                }
            }

          Map("status"->status, "rec"->rec)
        })

      val rm = m.flatten
      rm
    })

请提出正确的方法。

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    这会很棘手,因为 JSON 的顶级元素并不相同,即您有 map1map2,因此架构不一致。我会与“数据生产者”交谈并请求更改,以便命令的名称由单独的元素描述。


    给定DataFrame的架构如下:

    scala> commands.printSchema
    root
     |-- commands: array (nullable = true)
     |    |-- element: string (containsNull = true)
    

    以及其中的元素(行)数:

    scala> commands.count
    res1: Long = 1
    

    您必须首先explode commands 元素数组,然后访问感兴趣的字段。

    // 1. Explode the array
    val commandsExploded = commands.select(explode($"commands") as "command")
    scala> commandsExploded.count
    res2: Long = 2
    

    让我们创建 JSON 编码记录的架构。一种可能如下。

    // Note that it accepts map1 and map2 fields
    import org.apache.spark.sql.types._
    val schema = StructType(
      StructField("map1",
        StructType(
          StructField("completed-stages", LongType, true) ::
          StructField("total-stages", LongType, true) :: Nil), true) ::
      StructField("map2",
        StructType(
          StructField("completed-stages", LongType, true) ::
          StructField("total-stages", LongType, true) :: Nil), true) ::
      StructField("rec", StringType,true) ::
      StructField("status", StructType(
        StructField("state", StringType, true) :: Nil), true
      ) :: Nil)
    

    因此,您应该使用from_json 标准函数,该函数采用带有 JSON 编码字符串和架构的列。

    val commands = commandsExploded.select(from_json($"command", schema) as "command")
    scala> commands.show(truncate = false)
    +-------------------------------+
    |command                        |
    +-------------------------------+
    |[[1, 1],, test-plan, [SUCCESS]]|
    |[, [1, 1], test-proc, [FAILED]]|
    +-------------------------------+
    

    让我们看一下commands 数据集的架构。

    scala> commands.printSchema
    root
     |-- command: struct (nullable = true)
     |    |-- map1: struct (nullable = true)
     |    |    |-- completed-stages: long (nullable = true)
     |    |    |-- total-stages: long (nullable = true)
     |    |-- map2: struct (nullable = true)
     |    |    |-- completed-stages: long (nullable = true)
     |    |    |-- total-stages: long (nullable = true)
     |    |-- rec: string (nullable = true)
     |    |-- status: struct (nullable = true)
     |    |    |-- state: string (nullable = true)
    

    recstatus 这样的复杂字段是.-可访问的结构。

    val recs = commands.select(
      $"command.rec" as "rec",
      $"command.status.state" as "status")
    
    scala> recs.show
    +---------+-------+
    |      rec| status|
    +---------+-------+
    |test-plan|SUCCESS|
    |test-proc| FAILED|
    +---------+-------+
    

    将其转换为单记录 JSON 编码数据集需要 Dataset.toJSON 后跟 collect_list 标准函数。

    val result = recs.toJSON.agg(collect_list("value"))
    scala> result.show(truncate = false)
    +-------------------------------------------------------------------------------+
    |collect_list(value)                                                            |
    +-------------------------------------------------------------------------------+
    |[{"rec":"test-plan","status":"SUCCESS"}, {"rec":"test-proc","status":"FAILED"}]|
    +-------------------------------------------------------------------------------+
    

    【讨论】:

      【解决方案2】:

      您没有提供 df 的架构,因此以下内容可能不适合您。 我将 json 示例保存在 test.json 文件中,并使用 val df=spark.read.option("multiLine",true).json("test.json") 读取它,在这种情况下,您只需 df.select($"rec",$"status.state").write.json("test1.json") 即可获得所需的 json

      【讨论】:

        猜你喜欢
        • 2019-02-19
        • 2015-02-25
        • 2020-10-21
        • 2022-11-11
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2012-09-07
        相关资源
        最近更新 更多