【问题标题】:How to parse each row JSON to columns of Spark 2 DataFrame? [duplicate]如何将每行 JSON 解析为 Spark 2 DataFrame 的列? [复制]
【发布时间】:2018-07-16 07:27:16
【问题描述】:

在我的 Spark (2.2) DataFrame 中,每一行都是 JSON:

df.head()
//output
//[{"key":"111","event_name":"page-visited","timestamp":1517814315}]

df.show()
//output
//+--------------+
//|         value|
//+--------------+
//|{"key":"111...|
//|{"key":"222...|

我想将每个 JSON 行传递给列以获取此 result

key   event_name     timestamp
111   page-visited   1517814315
...

我尝试了这种方法,但它没有给我预期的结果:

import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._

val schema = StructType(Seq(
     StructField("key", StringType, true), StructField("event_name", StringType, true), StructField("timestamp", IntegerType, true)
))

val result = df.withColumn("value", from_json($"value", schema))

和:

result.printSchema()
root
 |-- value: struct (nullable = true)
 |    |-- key: string (nullable = true)
 |    |-- event_name: string (nullable = true)
 |    |-- timestamp: integer (nullable = true)

应该是这样的:

result.printSchema()
root
 |-- key: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- timestamp: integer (nullable = true)

【问题讨论】:

    标签: json scala apache-spark apache-spark-sql


    【解决方案1】:

    你可以在最后使用select($"value.*")struct列的元素选择为单独的列

    val result = df.withColumn("value", from_json($"value", schema)).select($"value.*")
    

    【讨论】:

      猜你喜欢
      • 2020-09-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-05-05
      • 1970-01-01
      • 2022-10-06
      相关资源
      最近更新 更多