【问题标题】:spark udaf update array type of tuplespark udaf 更新数组类型的元组
【发布时间】:2017-01-25 22:38:37
【问题描述】:

我正在使用 Scala + Spark 2.0 并尝试编写一个 UDAF,其中包含一个元组数组作为其内部缓冲区以及它的返回类型: ...

def bufferSchema = new StructType().add("midResults", ArrayType(  StructType(Array(StructField("a", DoubleType),StructField("b", DoubleType))) ))

def dataType: DataType = ArrayType(  StructType(Array(StructField( "a", DoubleType),StructField("b", DoubleType))) )

这就是我更新缓冲区的方式

def update(buffer: MutableAggregationBuffer, input: Row) = {
buffer(0) = buffer.getAs[mutable.WrappedArray[(Double,Double)]](3) ++ Array((3.0,4.0))
}

但我得到以下异常:

java.lang.ArrayStoreException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema

如果我有一个简单的 Double 数组,则此模式有效..

【问题讨论】:

    标签: arrays scala apache-spark aggregate-functions user-defined-functions


    【解决方案1】:

    java.lang.ArrayStoreException"thrown to indicate that an attempt has been made to store the wrong type of object into an array of objects" 并且预期的 because a local Scala type for StructType is o.a.s.sql.Row 不是元组。换句话说,您应该使用Seq[Row] 作为缓冲区字段并使用Row 作为值。

    注意事项

    • 循环调用++ 可能不是最好的主意。
    • 如果您认为创建 UDAF 有点过时,因为 Spark 2.0 collect_list 支持复杂类型。
    • 可以说AggregatorsUserDefinedAggregateFunctions 更加用户友好。

    【讨论】:

    • 不确定我是否同意它在这种情况下尝试存储 Row,但我肯定会切换到 collect_list 和 Aggregators(我同意 UDAF 有点笨拙)。谢谢指点。
    • 有点轻描淡写 :) Aggregators 仍然很冗长,但写起来更愉快。关于行,您不能拥有TupleN 类型的架构。此处的反射仅以一种方式起作用。所以缓冲区应该包含Seq[Row],函数应该期望Rows作为输入。
    • 在尝试了示例后,我遇到了查找自动编码器的问题。我添加了“import sparkSession.implicits._”还有其他指针吗?
    • Aggregators?请记住,API 在 1.6 和 2.0 之间发生了变化。我在这里有一个完整的 2.0 示例stackoverflow.com/a/32101530/1560062。当然,您应该首先将DataFrame 转换为一些静态类型的数据集。否则你将不得不处理 RowEncoders:stackoverflow.com/q/39433419/1560062
    • 我不明白这如何解决尝试在缓冲区中使用元组...
    猜你喜欢
    • 2015-12-19
    • 2018-03-10
    • 2023-04-03
    • 2023-03-21
    • 1970-01-01
    • 1970-01-01
    • 2016-08-01
    • 2018-03-22
    • 2017-06-12
    相关资源
    最近更新 更多