【问题标题】:How can get a flatten dataframe from a nested json containing arrays如何从包含数组的嵌套 json 中获取展平数据框
【发布时间】:2019-12-02 17:25:36
【问题描述】:

我刚开始用 Scala 编写 Spark,我正在尝试将一个 json 扁平化为数据帧以便在 hadoop 中扭动,我在处理嵌套的 json cointaining 数组时遇到了一些问题。有人提出一些建议以便在数据框中扁平化 json 吗?

这是我的架构:

root
 |-- Destination: struct (nullable = true)
 |    |-- DestinationId: string (nullable = true)
 |    |-- Type: string (nullable = true)
 |-- Header: struct (nullable = true)
 |    |-- MessageTime: string (nullable = true)
 |    |-- MessageVersion: string (nullable = true)
 |-- Payload: struct (nullable = true)
 |    |-- Sensors: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- Location: struct (nullable = true)
 |    |    |    |    |-- Lat: string (nullable = true)
 |    |    |    |    |-- Lng: string (nullable = true)
 |    |    |    |-- Measures: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- PropertyId: string (nullable = true)
 |    |    |    |    |    |-- Time: string (nullable = true)
 |    |    |    |    |    |-- Value: string (nullable = true)
 |    |    |    |-- SensorId: string (nullable = true)
 |    |-- TransactionId: string (nullable = true)
 |    |-- Type: string (nullable = true)
 |-- Source: struct (nullable = true)
 |    |-- Location: struct (nullable = true)
 |    |    |-- Lat: string (nullable = true)
 |    |    |-- Lng: string (nullable = true)
 |    |-- SourceId: string (nullable = true)
 |    |-- Type: string (nullable = true)

我尝试了作为几个问题的解决方案提供的 flattan 函数

这就是我所做的:

val df2 = spark.read.json("message.json").toDF()

val dfPayload = df2.select(df2("Payload")).select("Payload.Type","Payload.TransactionId", "Payload.Sensors" )
val dfSensors = dfPayload.select(explode(dfPayload("Sensors"))).toDF("Sensors").select("Sensors.SensorId", "Sensors.Location.Lat", "Sensors.Location.Lng", "Sensors.Measures")
      //output
      dfSensors.show()
Sensors
+--------+----------+----------+--------------------+
|SensorId|       Lat|       Lng|            Measures|
+--------+----------+----------+--------------------+
|  S02001|43.5356278|10.2977402|[[DISPL, 2019-07-...|
|  S02002|43.1237824|10.0984567|[[DISPL, 2019-06-...|
|  S02003|43.0598432|10.9152361|[[TEMP, 2019-07-1...|
+--------+----------+----------+--------------------+

      val dfMeasures = dfSensors.select(explode(dfSensors("Measures"))).toDF("Measures").select("Measures.Time", "Measures.PropertyId", "Measures.Value")
      //output
      dfMeasures.show()
Measures
+--------------------+----------+-----+
|                Time|PropertyId|Value|
+--------------------+----------+-----+
|2019-07-17T10:43:...|     DISPL|00.87|
|2019-07-17T10:43:...|      ACCX|00.02|
|2019-07-17T10:43:...|      ACCY|00.09|
|2019-07-17T10:43:...|      ACCZ|00.03|
|2019-07-17T10:43:...|     DISPL|00.92|
|2019-07-17T10:43:...|      ACCX|00.69|
|2019-07-17T10:43:...|      ACCY|00.21|
|2019-06-26T08:25:...|      ACCZ|00.95|
|2019-06-26T08:16:...|     DISPL|00.71|
|2019-07-17T10:43:...|     DISPL|10.43|
|2019-07-17T10:43:...|     DISPL|10.36|
|2019-07-17T10:43:...|      ACCX|10.22|
|2019-07-17T10:43:...|      ACCY|00.69|
|2019-07-17T10:43:...|      ACCZ|00.81|
|2019-07-17T10:43:...|      ACCX|00.22|
|2019-07-17T10:43:...|      ACCY|00.49|
|2019-07-17T10:43:...|      ACCZ|00.74|
|2019-07-17T10:43:...|      ACCX|00.23|
|2019-07-17T10:43:...|      ACCY|00.95|
|2019-07-17T10:43:...|      ACCZ|00.90|
+--------------------+----------+-----+

我可以爆炸数组但是我丢失了父亲的列

我需要一个包含爆炸数组作为记录和父列的扁平数据框,像这样

+--------+----------+----------+--------------------+----------+-----+
|SensorId|       Lat|       Lng|                Time|PropertyId|Value|
+--------+----------+----------+--------------------+----------+-----+
|  S02001|43.5356278|10.2977402|2019-07-17T10:43:...|     DISPL|00.87|
|  S02001|43.5356278|10.2977402|2019-07-17T10:43:...|      ACCX|00.02|
|  S02001|43.5356278|10.2977402|2019-07-17T10:43:...|      ACCY|00.09|
|  S02001|43.5356278|10.2977402|2019-07-17T10:43:...|      ACCZ|00.03|
|  S02002|43.1237824|10.0984567|2019-06-26T08:25:...|      ACCZ|00.95|
|  S02003|43.0598432|10.9152361|2019-06-26T08:16:...|     DISPL|00.71|
+--------+----------+----------+--------------------+----------+-----+

任何人都遇到过同样的问题;

【问题讨论】:

  • 您应该在选择中包含所有列。或者你可以使用带列的爆炸。

标签: json scala dataframe apache-spark flatten


【解决方案1】:

使用这个自定义函数:

import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{DataFrame, Column}
import org.apache.spark.sql.functions.col

//////////////////////// Flatten schema ////////////////////////
implicit class DataFrameHelpers(df: DataFrame) {
  def flattenSchema: DataFrame = {
    df.select(flattenStructs(Nil, df.schema): _*)
  }
  def removeColPrefix(prefix: String): DataFrame = {
    df.select(df.columns.map(c => col(c).as(c.replaceFirst(prefix,""))): _*)
  }
}

protected def flattenStructs(path: Seq[String], schema: DataType): Seq[Column] = schema match {
  case s: StructType => s.fields.flatMap(f => flattenStructs(path :+ f.name, f.dataType))
  case other => col(path.map(n => s"`$n`").mkString(".")).as(path.mkString("_")) :: Nil
}

像这样运行:

val flattenedReaderDF = df2
  .select("Payload")
  .flattenSchema

如果你不希望它推断架构,你也可以像.select(from_json($"Payload", schema)那样将架构传递给它

从那里你可以运行removeColPrefix,并删除任何你不想要的东西:

val flattenedReaderDF = df2
  .select(col("Payload"))
  .flattenSchema
  .removeColPrefix("Sensors_", "Location_", "Measures_")

【讨论】:

    猜你喜欢
    • 2023-03-07
    • 2021-10-12
    • 2020-03-11
    • 2021-04-27
    • 1970-01-01
    • 1970-01-01
    • 2017-06-19
    • 2019-03-18
    • 1970-01-01
    相关资源
    最近更新 更多