【问题标题】:How to process complex data in ArrayType using Spark functions如何使用 Spark 函数处理 ArrayType 中的复杂数据
【发布时间】:2018-08-16 08:19:46
【问题描述】:

有一个json数据源。以下是一行的示例:

{
  "PrimaryAcctNumber": "account1",
  "AdditionalData": [
    {
      "Addrs": [
        "an address for account1",
        "the longest address in the address list for account1",
        "another address for account1"
      ],
      "AccountNumber": "Account1",
      "IP": 2368971684
    },
    {
      "Addrs": [
        "an address for account2",
        "the longest address in the address list for account2",
        "another address for account2"
      ],
      "AccountNumber": "Account2",
      "IP": 9864766814
    }
  ]
}

所以当加载它以触发 DataFrame 时,架构是:

root
 |-- PrimaryAcctNumber: string (nullable = true)
 |-- AdditionalData: array (nullable = true)
 |    |-- element: struct (containsNull = true)

我想使用 Spark 基于列 AdditionalData (ArrayType[StructType]) 使用以下逻辑创建一个名为 LongestAddressOfPrimaryAccount 的新列:

  • 迭代附加数据
    • 如果AccountNumber属性等于行的PrimaryAcctNumber,则LongestAddressOfPrimaryAccount的值将是Addrs数组中最长的字符串
    • 如果没有AccountNumber 属性等于PrimaryAcctNumber,则值为“N/A”

所以对于上面给定的行,预期的输出是:

{
  "PrimaryAcctNumber": "account1",
  "AdditionalData": [
    {
      "Addrs": [
        "an address for account1",
        "the longest address in the address list for account1",
        "another address for account1"
      ],
      "AccountNumber": "Account1",
      "IP": 2368971684
    },
    {
      "Addrs": [
        "an address for account2",
        "the longest address in the address list for account2",
        "another address for account2"
      ],
      "AccountNumber": "Account2",
      "IP": 9864766814
    }
  ],
  "LongestAddressOfPrimaryAccount": "the longest address in the address list for account1"
}

使用 UDF 或 map 函数是可行的。但这不是 Spark 的最佳实践。

只使用 Spark 函数是否可行?比如:

sourceDdf.withColumn("LongestAddressOfPrimaryAccount", coalesce(
  longest(
    get_field(iterate_array_for_match($"AdditionalData", "AccountNumber", $"PrimaryAcctNumber"), "Addrs")
  )
  , lit("N/A")))

【问题讨论】:

    标签: scala apache-spark apache-spark-sql apache-spark-dataset


    【解决方案1】:

    如果您的 spark 版本为 2.2 或更低版本,则必须为您的要求编写一个 udf 函数,因为 内置函数更复杂和更慢在你必须组合更多内置函数的意义上慢)比使用udf 函数。而且我不知道有这样的内置功能可以直接满足您的要求。

    Databricks 团队正在开发 Nested Data Using Higher Order Functions in SQL,这些将在下一个版本中发布。

    在此之前,如果您不想让您的工作变得复杂,则必须编写 udf 函数。

    【讨论】:

    • 我肯定会使用最新的 Spark 发布版本。感谢您的指导。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-02-02
    • 2021-05-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多