【问题标题】:Spark Scala - Split Array of Structs into Dataframe ColumnsSpark Scala - 将结构数组拆分为数据框列
【发布时间】:2021-07-29 09:05:03
【问题描述】:

我有一个包含结构数组的嵌套源 json 文件。结构的数量因行而异,我想使用 Spark(scala)从结构的键/值动态创建新的数据框列,其中键是列名,值是列值。

缩小的 json 记录示例

{"key1":{"key2":{"key3":"AK","key4":"EU","key5":{"key6":"001","key7":"N","values":[{"name":"valuesColumn1","value":"9.876"},{"name":"valuesColumn2","value":"1.2345"},{"name":"valuesColumn3","value":"8.675309"}]}}}}

数据框架构

scala> val df = spark.read.json("file:///tmp/nested_test.json")
root
 |-- key1: struct (nullable = true)
 |    |-- key2: struct (nullable = true)
 |    |    |-- key3: string (nullable = true)
 |    |    |-- key4: string (nullable = true)
 |    |    |-- key5: struct (nullable = true)
 |    |    |    |-- key6: string (nullable = true)
 |    |    |    |-- key7: string (nullable = true)
 |    |    |    |-- values: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |    |-- value: string (nullable = true)

到目前为止做了什么

df.select(
    ($"key1.key2.key3").as("key3"),
    ($"key1.key2.key4").as("key4"),
    ($"key1.key2.key5.key6").as("key6"),
    ($"key1.key2.key5.key7").as("key7"),
    ($"key1.key2.key5.values").as("values")).
    show(truncate=false)

+----+----+----+----+----------------------------------------------------------------------------+
|key3|key4|key6|key7|values                                                                      |
+----+----+----+----+----------------------------------------------------------------------------+
|AK  |EU  |001 |N   |[[valuesColumn1, 9.876], [valuesColumn2, 1.2345], [valuesColumn3, 8.675309]]|
+----+----+----+----+----------------------------------------------------------------------------+

这里有一个由 3 个结构组成的数组,但是这 3 个结构需要动态地拆分为 3 个单独的列(3 个的数量可能会有很大差异),我不知道该怎么做。

样本期望输出

请注意,values 数组中的每个数组元素都生成了 3 个新列。

+----+----+----+----+-----------------------------------------+
|key3|key4|key6|key7|valuesColumn1|valuesColumn2|valuesColumn3|
+----+----+----+----+-----------------------------------------+
|AK  |EU  |001 |N   |9.876        |1.2345        |8.675309    |
+----+----+----+----+-----------------------------------------+

参考

我相信所需的解决方案是something similar to what was discussed in this SO post,但有两个主要区别:

  1. 在 SO 帖子中,列数被硬编码为 3,但在我的情况下,数组元素的数量是未知的
  2. 列名需要由name 列驱动,列值由value 驱动。
...
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |    |-- value: string (nullable = true)

【问题讨论】:

    标签: json scala apache-spark


    【解决方案1】:

    你可以这样做:

    val sac = new SparkContext("local[*]", " first Program");
    val sqlc = new SQLContext(sac);
    import sqlc.implicits._;
    import org.apache.spark.sql.functions.split
    import scala.math._
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.functions.{ min, max }
    
    val json = """{"key1":{"key2":{"key3":"AK","key4":"EU","key5":{"key6":"001","key7":"N","values":[{"name":"valuesColumn1","value":"9.876"},{"name":"valuesColumn2","value":"1.2345"},{"name":"valuesColumn3","value":"8.675309"}]}}}}"""
    
    val df1 = sqlc.read.json(Seq(json).toDS())
    
    val df2 = df1.select(
        ($"key1.key2.key3").as("key3"),
        ($"key1.key2.key4").as("key4"),
        ($"key1.key2.key5.key6").as("key6"),
        ($"key1.key2.key5.key7").as("key7"),
        ($"key1.key2.key5.values").as("values")
    )
    
    val numColsVal = df2
        .withColumn("values_size", size($"values"))
        .agg(max($"values_size"))
        .head()
        .getInt(0)
    
    val finalDFColumns = df2.select(explode($"values").as("values")).select("values.*").select("name").distinct.map(_.getAs[String](0)).orderBy($"value".asc).collect.foldLeft(df2.limit(0))((cdf, c) => cdf.withColumn(c, lit(null))).columns
    val finalDF = df2.select($"*" +: (0 until numColsVal).map(i => $"values".getItem(i)("value").as($"values".getItem(i)("name").toString)): _*)
    finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf, column) => fdf.withColumnRenamed(column._1, column._2)).show(false)
    finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf, column) => fdf.withColumnRenamed(column._1, column._2)).drop($"values").show(false)
    

    得到的最终输出为:

    +----+----+----+----+-------------+-------------+-------------+
    |key3|key4|key6|key7|valuesColumn1|valuesColumn2|valuesColumn3|
    +----+----+----+----+-------------+-------------+-------------+
    |AK  |EU  |001 |N   |9.876        |1.2345       |8.675309     |
    +----+----+----+----+-------------+-------------+-------------+
    

    希望我的问题没有错!

    ----------- 编辑说明----------

    此块获取要为数组结构创建的列数。

    val numColsVal = df2
            .withColumn("values_size", size($"values"))
            .agg(max($"values_size"))
            .head()
            .getInt(0)
    

    finalDFColumns 是使用所有预期列作为空值输出创建的 DF。

    下面的块返回需要从数组结构中创建的不同列。

    df2.select(explode($"values").as("values")).select("values.*").select("name").distinct.map(_.getAs[String](0)).orderBy($"value".asc).collect
    

    下面的块将上面的新列与df2 中的其他列组合在一起,用空/空值初始化。

    foldLeft(df2.limit(0))((cdf, c) => cdf.withColumn(c, lit(null)))
    

    如果您打印输出,则将这两个块组合起来:

    +----+----+----+----+------+-------------+-------------+-------------+
    |key3|key4|key6|key7|values|valuesColumn1|valuesColumn2|valuesColumn3|
    +----+----+----+----+------+-------------+-------------+-------------+
    +----+----+----+----+------+-------------+-------------+-------------+
    

    现在我们已经准备好了结构。我们需要这里对应列的值。下面的块为我们提供了值:

    df2.select($"*" +: (0 until numColsVal).map(i => $"values".getItem(i)("value").as($"values".getItem(i)("name").toString)): _*)
    

    结果如下:

    +----+----+----+----+--------------------+---------------+---------------+---------------+
    |key3|key4|key6|key7|              values|values[0][name]|values[1][name]|values[2][name]|
    +----+----+----+----+--------------------+---------------+---------------+---------------+
    |  AK|  EU| 001|   N|[[valuesColumn1, ...|          9.876|         1.2345|       8.675309|
    +----+----+----+----+--------------------+---------------+---------------+---------------+
    
    

    现在我们需要像上面第一个块中那样重命名列。所以我们将使用zip 函数合并列,然后使用 foldLeft 方法重命名输出列,如下所示:

    finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf, column) => fdf.withColumnRenamed(column._1, column._2)).show(false)
    

    这导致以下结构:

    +----+----+----+----+--------------------+-------------+-------------+-------------+
    |key3|key4|key6|key7|              values|valuesColumn1|valuesColumn2|valuesColumn3|
    +----+----+----+----+--------------------+-------------+-------------+-------------+
    |  AK|  EU| 001|   N|[[valuesColumn1, ...|        9.876|       1.2345|     8.675309|
    +----+----+----+----+--------------------+-------------+-------------+-------------+
    
    

    我们快到了。我们现在只需要像这样删除不需要的values 列:

    finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf, column) => fdf.withColumnRenamed(column._1, column._2)).drop($"values").show(false)
    

    因此导致预期输出如下 -

    +----+----+----+----+-------------+-------------+-------------+
    |key3|key4|key6|key7|valuesColumn1|valuesColumn2|valuesColumn3|
    +----+----+----+----+-------------+-------------+-------------+
    |AK  |EU  |001 |N   |9.876        |1.2345       |8.675309     |
    +----+----+----+----+-------------+-------------+-------------+
    

    我不确定我是否能够清楚地解释它。但是,如果您尝试破坏上述语句/代码并尝试打印它,您将了解我们在输出之前是如何到达的。您可以在互联网上找到该逻辑中使用的不同功能的示例说明。

    【讨论】:

    • 感谢您的回复,这似乎可以解决问题,但是您能否提供一些关于这里发生的事情的详细信息?
    • 我试图解释用于获取输出的逻辑部分。不确定我是否解释得很好:) 用解释编辑了相同的答案。如果您觉得有帮助,请将其标记为答案。
    【解决方案2】:

    我发现这种方法表现得更好,并且使用分解和枢轴更容易理解:

    val json = """{"key1":{"key2":{"key3":"AK","key4":"EU","key5":{"key6":"001","key7":"N","values":[{"name":"valuesColumn1","value":"9.876"},{"name":"valuesColumn2","value":"1.2345"},{"name":"valuesColumn3","value":"8.675309"}]}}}}"""
    
    val df = spark.read.json(Seq(json).toDS())
    
    // schema
    df.printSchema
    root
     |-- key1: struct (nullable = true)
     |    |-- key2: struct (nullable = true)
     |    |    |-- key3: string (nullable = true)
     |    |    |-- key4: string (nullable = true)
     |    |    |-- key5: struct (nullable = true)
     |    |    |    |-- key6: string (nullable = true)
     |    |    |    |-- key7: string (nullable = true)
     |    |    |    |-- values: array (nullable = true)
     |    |    |    |    |-- element: struct (containsNull = true)
     |    |    |    |    |    |-- name: string (nullable = true)
     |    |    |    |    |    |-- value: string (nullable = true)
    
    // create final df
    val finalDf = df.
        select(
          $"key1.key2.key3".as("key3"),
          $"key1.key2.key4".as("key4"),
          $"key1.key2.key5.key6".as("key6"),
          $"key1.key2.key5.key7".as("key7"),
          explode($"key1.key2.key5.values").as("values")
        ).
        groupBy(
          $"key3", $"key4", $"key6", $"key7"
        ).
        pivot("values.name").
        agg(min("values.value")).alias("values.name")
    
    // result
    finalDf.show
    +----+----+----+----+-------------+-------------+-------------+
    |key3|key4|key6|key7|valuesColumn1|valuesColumn2|valuesColumn3|
    +----+----+----+----+-------------+-------------+-------------+
    |  AK|  EU| 001|   N|        9.876|       1.2345|     8.675309|
    +----+----+----+----+-------------+-------------+-------------+
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-08-26
      • 2022-01-26
      • 2020-12-06
      • 2017-11-03
      • 1970-01-01
      相关资源
      最近更新 更多