【发布时间】:2022-11-22 23:12:48
【问题描述】:
我正在通过 Spark Structured 流媒体阅读 Kafka。输入的 Kafka 消息具有以下 JSON 格式:
[
{
"customer": "Jim",
"sex": "male",
"country": "US"
},
{
"customer": "Pam",
"sex": "female",
"country": "US"
}
]
我有如下定义的模式来解析它:
val schemaAsJson = ArrayType(StructType(Seq(
StructField("customer",StringType,true),
StructField("sex",StringType,true),
StructField("country",StringType,true))),true)
我的代码看起来像这样,
df.select(from_json($"col", schemaAsJson) as "json")
.select("json.customer","json.sex","json.country")
当前输出看起来像这样,
+--------------+----------------+----------------+
| customer| sex|country |
+--------------+----------------+----------------+
| [Jim, Pam]| [male, female]| [US, US]|
+--------------+----------------+----------------+
预期输出:
+--------------+----------------+----------------+
| customer| sex| country|
+--------------+----------------+----------------+
| Jim| male| US|
| Pam| female| US|
+--------------+----------------+----------------+
如何将结构数组拆分成单独的行,如上所示?有人可以帮忙吗?
【问题讨论】:
标签: json dataframe scala apache-spark apache-kafka