【问题标题】:Pass a struct to an UDAF in spark将结构传递给 Spark 中的 UDAF
【发布时间】:2019-02-04 14:17:40
【问题描述】:

我有以下架构 -

root
 |-- id:string (nullable = false)
 |-- age: long (nullable = true)
 |-- cars: struct (nullable = true)
 |    |-- car1: string (nullable = true)
 |    |-- car2: string (nullable = true)
 |    |-- car3: string (nullable = true)
 |-- name: string (nullable = true)

如何将 struct 'cars' 传递给 udaf?如果我只想传递汽车子结构,那么 inputSchema 应该是什么。

【问题讨论】:

    标签: scala apache-spark hadoop apache-spark-sql user-defined-functions


    【解决方案1】:

    可以,但 UDAF 的逻辑会有所不同。例如,如果您有两行:

    val seq = Seq(cars(cars_schema("car1", "car2", "car3")), (cars(cars_schema("car1", "car2", "car3"))))
    
    val rdd = spark.sparkContext.parallelize(seq)
    

    这里的架构是

    root
     |-- cars: struct (nullable = true)
     |    |-- car1: string (nullable = true)
     |    |-- car2: string (nullable = true)
     |    |-- car3: string (nullable = true)
    

    那么如果你尝试调用聚合:

    val df = seq.toDF
    df.agg(agg0(col("cars")))
    

    您必须更改您的 UDAF 输入架构,例如:

    val carsSchema =
        StructType(List(StructField("car1", StringType, true), StructField("car2", StringType, true), StructField("car3", StringType, true)))
    

    在你的 UDAF 中,你必须处理这个改变 inputSchema 的模式:

    override def inputSchema: StructType = StructType(StructField("input", carsSchema) :: Nil)
    

    在您的更新方法中,您必须处理输入行的格式:

    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
      val i = input.getAs[Array[Array[String]]](0)
      // i here would be [car1,car2,car3],  an array of strings
      buffer(0) = ???
    }
    

    从这里开始,您可以转换 i 以更新缓冲区并完成合并和评估功能。

    【讨论】:

    • 谢谢。这真的很有帮助。除了使用 get* 方法之外,还有什么更好的方法可以从输入中提取值?
    猜你喜欢
    • 1970-01-01
    • 2023-03-15
    • 2019-09-19
    • 1970-01-01
    • 1970-01-01
    • 2012-05-09
    • 1970-01-01
    相关资源
    最近更新 更多