【问题标题】:Split array of structs from JSON into Dataframe rows in SPARK将结构数组从 JSON 拆分为 SPARK 中的 Dataframe 行
【发布时间】:2022-11-22 23:12:48
【问题描述】:

我正在通过 Spark Structured 流媒体阅读 Kafka。输入的 Kafka 消息具有以下 JSON 格式:

[
  {
    "customer": "Jim",
    "sex": "male",
    "country": "US"  
  },
  {
    "customer": "Pam",
    "sex": "female",
    "country": "US"
  } 
] 

我有如下定义的模式来解析它:

val schemaAsJson = ArrayType(StructType(Seq(
      StructField("customer",StringType,true),
      StructField("sex",StringType,true),
      StructField("country",StringType,true))),true) 

我的代码看起来像这样,

df.select(from_json($"col", schemaAsJson) as "json")
  .select("json.customer","json.sex","json.country")

当前输出看起来像这样,

+--------------+----------------+----------------+
|      customer|             sex|country         |
+--------------+----------------+----------------+
|    [Jim, Pam]|  [male, female]|        [US, US]|
+--------------+----------------+----------------+

预期输出:

+--------------+----------------+----------------+
|      customer|             sex|         country|
+--------------+----------------+----------------+
|           Jim|            male|              US|
|           Pam|          female|              US|
+--------------+----------------+----------------+

如何将结构数组拆分成单独的行,如上所示?有人可以帮忙吗?

【问题讨论】:

    标签: json dataframe scala apache-spark apache-kafka


    【解决方案1】:

    选择前需要分解列。

    df.select(explode_outer(from_json($"value", schemaAsJson)) as "json")
    .select("json.customer","json.sex","json.country").show()
    

    【讨论】:

    • 我尝试过这个。我收到此错误:线程“主”中的异常 org.apache.spark.sql.AnalysisException:无法解析给定输入列的“json.customer”:[col];
    • 更新了答案,现在检查
    • 对不起,什么是 df.s.?
    • 抱歉,打错了
    猜你喜欢
    • 2020-12-06
    • 2019-08-26
    • 1970-01-01
    • 2021-07-29
    • 1970-01-01
    • 2020-09-07
    • 1970-01-01
    • 2018-12-09
    • 1970-01-01
    相关资源
    最近更新 更多