【发布时间】:2019-12-02 17:25:36
【问题描述】:
我刚开始用 Scala 编写 Spark,我正在尝试将一个 json 扁平化为数据帧以便在 hadoop 中扭动,我在处理嵌套的 json cointaining 数组时遇到了一些问题。有人提出一些建议以便在数据框中扁平化 json 吗?
这是我的架构:
root
|-- Destination: struct (nullable = true)
| |-- DestinationId: string (nullable = true)
| |-- Type: string (nullable = true)
|-- Header: struct (nullable = true)
| |-- MessageTime: string (nullable = true)
| |-- MessageVersion: string (nullable = true)
|-- Payload: struct (nullable = true)
| |-- Sensors: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- Location: struct (nullable = true)
| | | | |-- Lat: string (nullable = true)
| | | | |-- Lng: string (nullable = true)
| | | |-- Measures: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- PropertyId: string (nullable = true)
| | | | | |-- Time: string (nullable = true)
| | | | | |-- Value: string (nullable = true)
| | | |-- SensorId: string (nullable = true)
| |-- TransactionId: string (nullable = true)
| |-- Type: string (nullable = true)
|-- Source: struct (nullable = true)
| |-- Location: struct (nullable = true)
| | |-- Lat: string (nullable = true)
| | |-- Lng: string (nullable = true)
| |-- SourceId: string (nullable = true)
| |-- Type: string (nullable = true)
我尝试了作为几个问题的解决方案提供的 flattan 函数
这就是我所做的:
val df2 = spark.read.json("message.json").toDF()
val dfPayload = df2.select(df2("Payload")).select("Payload.Type","Payload.TransactionId", "Payload.Sensors" )
val dfSensors = dfPayload.select(explode(dfPayload("Sensors"))).toDF("Sensors").select("Sensors.SensorId", "Sensors.Location.Lat", "Sensors.Location.Lng", "Sensors.Measures")
//output
dfSensors.show()
Sensors
+--------+----------+----------+--------------------+
|SensorId| Lat| Lng| Measures|
+--------+----------+----------+--------------------+
| S02001|43.5356278|10.2977402|[[DISPL, 2019-07-...|
| S02002|43.1237824|10.0984567|[[DISPL, 2019-06-...|
| S02003|43.0598432|10.9152361|[[TEMP, 2019-07-1...|
+--------+----------+----------+--------------------+
val dfMeasures = dfSensors.select(explode(dfSensors("Measures"))).toDF("Measures").select("Measures.Time", "Measures.PropertyId", "Measures.Value")
//output
dfMeasures.show()
Measures
+--------------------+----------+-----+
| Time|PropertyId|Value|
+--------------------+----------+-----+
|2019-07-17T10:43:...| DISPL|00.87|
|2019-07-17T10:43:...| ACCX|00.02|
|2019-07-17T10:43:...| ACCY|00.09|
|2019-07-17T10:43:...| ACCZ|00.03|
|2019-07-17T10:43:...| DISPL|00.92|
|2019-07-17T10:43:...| ACCX|00.69|
|2019-07-17T10:43:...| ACCY|00.21|
|2019-06-26T08:25:...| ACCZ|00.95|
|2019-06-26T08:16:...| DISPL|00.71|
|2019-07-17T10:43:...| DISPL|10.43|
|2019-07-17T10:43:...| DISPL|10.36|
|2019-07-17T10:43:...| ACCX|10.22|
|2019-07-17T10:43:...| ACCY|00.69|
|2019-07-17T10:43:...| ACCZ|00.81|
|2019-07-17T10:43:...| ACCX|00.22|
|2019-07-17T10:43:...| ACCY|00.49|
|2019-07-17T10:43:...| ACCZ|00.74|
|2019-07-17T10:43:...| ACCX|00.23|
|2019-07-17T10:43:...| ACCY|00.95|
|2019-07-17T10:43:...| ACCZ|00.90|
+--------------------+----------+-----+
我可以爆炸数组但是我丢失了父亲的列
我需要一个包含爆炸数组作为记录和父列的扁平数据框,像这样
+--------+----------+----------+--------------------+----------+-----+
|SensorId| Lat| Lng| Time|PropertyId|Value|
+--------+----------+----------+--------------------+----------+-----+
| S02001|43.5356278|10.2977402|2019-07-17T10:43:...| DISPL|00.87|
| S02001|43.5356278|10.2977402|2019-07-17T10:43:...| ACCX|00.02|
| S02001|43.5356278|10.2977402|2019-07-17T10:43:...| ACCY|00.09|
| S02001|43.5356278|10.2977402|2019-07-17T10:43:...| ACCZ|00.03|
| S02002|43.1237824|10.0984567|2019-06-26T08:25:...| ACCZ|00.95|
| S02003|43.0598432|10.9152361|2019-06-26T08:16:...| DISPL|00.71|
+--------+----------+----------+--------------------+----------+-----+
任何人都遇到过同样的问题;
【问题讨论】:
-
您应该在选择中包含所有列。或者你可以使用带列的爆炸。
标签: json scala dataframe apache-spark flatten