【问题标题】:Array of struct parsing in Spark dataframeSpark数据框中的结构解析数组
【发布时间】:2020-08-11 19:05:23
【问题描述】:

我有一个带有一个结构类型列的数据框。示例数据框架构是:

root
 |-- Data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- value: string (nullable = true)

字段name 保存列名,字段value 保存列值。 Data 列中的元素数未定义,因此可能会有所不同。我需要解析这些数据并摆脱嵌套结构。 (数组Explode 在这种情况下不起作用,因为一行中的数据属于一个元素)。真正的模式要大得多,并且有多个像“数据”这样的数组字段,所以我的目标是创建一个通用解决方案,我将应用于类似的结构数组。 示例:

样本数据:

val data = Seq(
    """{"Data": [{ "name": "FName", "value": "Alex" }, { "name": "LName",   "value": "Strong"  }]}""",
    """{"Data": [{ "name": "FName", "value": "Robert " }, { "name": "MName",   "value": "Nesta "  }]} { "name": "LName",   "value": "Marley"  }]}"""
)
val df = spark.read.json(spark.sparkContext.parallelize(data))

预期结果:

+-------+------+
|  FName| LName|
+-------+------+
|   Alex|Strong|
|Robert |Marley|
+-------+------+
 

作为解决方案,我创建了一个在整个 Data 列上执行的 UDF。作为输入参数,我传递了列名和要提取的字段名。

 val find_scheme_name_in_array = udf { (arr: Seq[Row], columnName: String) => {
    var value = ""
    arr.foreach(el =>
        if(el.getAs[String]("name") == columnName){
            value = el.getAs[String]("value")
        }
    )
    value
}}

问题是我使用变量value 来存储中间结果,我不想为要执行我的UDF 的每一行创建一个新的变量。

我执行 UDF 的方式(该查询生成预期结果):

df.select(find_scheme_name_in_array(col("Data"), lit("FName")).as("FName"),find_scheme_name_in_array(col("Data"), lit("LName")).as("LName")).show()

我很高兴听到任何有关如何改进 UDF 逻辑以及如何解决解析问题的不同方法的 cmets。

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    我通过用find 方法替换foreach 循环解决了这个问题:

    val find_scheme_name_in_array = udf { (arr: Seq[Row], columnName: String) =>
        arr.find(_.getAs[String]("name") == columnName) match {
            case Some(i) => i.getAs[String]("value")
            case None => null
        }
    }
    

    【讨论】:

      【解决方案2】:

      也许这有帮助-

        val data = Seq(
            """{"Data": [{ "name": "FName", "value": "Alex" }, { "name": "LName",   "value": "Strong"  }]}""",
            """{"Data": [{ "name": "FName", "value": "Robert " }, { "name": "MName",   "value": "Nesta "  }, {
              |"name": "LName",   "value": "Marley"  }]}""".stripMargin
          )
          val df = spark.read
            .json(data.toDS())
          df.show(false)
          df.printSchema()
      
          /**
            * +----------------------------------------------------+
            * |Data                                                |
            * +----------------------------------------------------+
            * |[[FName, Alex], [LName, Strong]]                    |
            * |[[FName, Robert ], [MName, Nesta ], [LName, Marley]]|
            * +----------------------------------------------------+
            *
            * root
            * |-- Data: array (nullable = true)
            * |    |-- element: struct (containsNull = true)
            * |    |    |-- name: string (nullable = true)
            * |    |    |-- value: string (nullable = true)
            */
      
          df.selectExpr("inline_outer(Data)")
            .groupBy()
            .pivot("name")
            .agg(collect_list("value"))
            .withColumn("x", arrays_zip($"FName", $"LName"))
            .selectExpr("inline_outer(x)")
            .show(false)
      
          /**
            * +-------+------+
            * |FName  |LName |
            * +-------+------+
            * |Alex   |Strong|
            * |Robert |Marley|
            * +-------+------+
            */
      

      【讨论】:

        猜你喜欢
        • 2018-08-01
        • 2018-12-03
        • 2021-09-24
        • 2021-09-28
        • 1970-01-01
        • 2021-07-29
        • 1970-01-01
        • 2018-05-13
        • 2021-03-13
        相关资源
        最近更新 更多