【发布时间】:2018-08-16 08:19:46
【问题描述】:
有一个json数据源。以下是一行的示例:
{
"PrimaryAcctNumber": "account1",
"AdditionalData": [
{
"Addrs": [
"an address for account1",
"the longest address in the address list for account1",
"another address for account1"
],
"AccountNumber": "Account1",
"IP": 2368971684
},
{
"Addrs": [
"an address for account2",
"the longest address in the address list for account2",
"another address for account2"
],
"AccountNumber": "Account2",
"IP": 9864766814
}
]
}
所以当加载它以触发 DataFrame 时,架构是:
root
|-- PrimaryAcctNumber: string (nullable = true)
|-- AdditionalData: array (nullable = true)
| |-- element: struct (containsNull = true)
我想使用 Spark 基于列 AdditionalData (ArrayType[StructType]) 使用以下逻辑创建一个名为 LongestAddressOfPrimaryAccount 的新列:
- 迭代附加数据
- 如果
AccountNumber属性等于行的PrimaryAcctNumber,则LongestAddressOfPrimaryAccount的值将是Addrs数组中最长的字符串 - 如果没有
AccountNumber属性等于PrimaryAcctNumber,则值为“N/A”
- 如果
所以对于上面给定的行,预期的输出是:
{
"PrimaryAcctNumber": "account1",
"AdditionalData": [
{
"Addrs": [
"an address for account1",
"the longest address in the address list for account1",
"another address for account1"
],
"AccountNumber": "Account1",
"IP": 2368971684
},
{
"Addrs": [
"an address for account2",
"the longest address in the address list for account2",
"another address for account2"
],
"AccountNumber": "Account2",
"IP": 9864766814
}
],
"LongestAddressOfPrimaryAccount": "the longest address in the address list for account1"
}
使用 UDF 或 map 函数是可行的。但这不是 Spark 的最佳实践。
只使用 Spark 函数是否可行?比如:
sourceDdf.withColumn("LongestAddressOfPrimaryAccount", coalesce(
longest(
get_field(iterate_array_for_match($"AdditionalData", "AccountNumber", $"PrimaryAcctNumber"), "Addrs")
)
, lit("N/A")))
【问题讨论】:
标签: scala apache-spark apache-spark-sql apache-spark-dataset