【问题标题】:Spark UDAF dynamic input schema handlingSpark UDAF 动态输入模式处理
【发布时间】:2019-02-06 07:31:24
【问题描述】:

我知道如何将具有内部结构的结构传递给 UDAF - Pass a struct to an UDAF in spark

但是我如何处理内部结构模式未知或动态的情况,因为它会根据数据发生变化。由于输入数据不符合特定模式,因此某些字段可能存在也可能不存在。假设一个数据集有

   root
     |-- id:string (nullable = false)
     |-- age: long (nullable = true)
     |-- cars: struct (nullable = true)
     |    |-- car1: string (nullable = true)
     |    |-- car2: string (nullable = true)
     |    |-- car3: string (nullable = true)
     |-- name: string (nullable = true)

而其他数据集没有car3

root
 |-- id:string (nullable = false)
 |-- age: long (nullable = true)
 |-- cars: struct (nullable = true)
 |    |-- car1: string (nullable = true)
 |    |-- car2: string (nullable = true)
 |-- name: string (nullable = true)

如何编写一个 UDAF,它接受基于输入数据而更改的架构。

【问题讨论】:

  • 有趣的问题。您能否将您的汽车模式转换为例如 ArrayType(String) 并调整您的 UDAF 以使用它?然后你可以在其中包含可变数量的元素。
  • 我想出了一种方法来处理它,通过初始化传递架构。

标签: scala apache-spark hadoop apache-spark-sql user-defined-functions


【解决方案1】:

可以在初始化 Udaf 类时动态传递模式 -

    val yetAnotherUdaf = new YetAnotherUdaf(schema)

    case class YetAnotherUdaf(schema:StructType) extends UserDefinedAggregateFunction {

      override def deterministic:Boolean=true
      override def dataType:DataType=schema
      override def inputSchema:StructType=schema
      override def bufferSchema:StructType=schema

      override def initialize(buffer:MutableAggregationBuffer):Unit={ ??? }
      override def update(buffer:MutableAggregationBuffer, input:Row):Unit={ ??? }
      override def merge(buffer1:MutableAggregationBuffer, buffer2:Row):Unit={???}
      override def evaluate(buffer:Row):StructType={ ??? }
   }

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2015-01-16
    • 2023-03-12
    • 2023-03-21
    • 1970-01-01
    • 2017-08-02
    • 1970-01-01
    • 1970-01-01
    • 2017-12-31
    相关资源
    最近更新 更多