【问题标题】:How to add/mutate a Map object in MutableAggregationBuffer in UDAF?如何在 UDAF 的 MutableAggregationBuffer 中添加/改变 Map 对象?
【发布时间】:2017-06-01 17:43:10
【问题描述】:

我使用 Spark 2.0.1 和 Scala 2.11。

这是一个与 Spark 中的用户定义聚合函数 (UDAF) 相关的问题。我正在使用here 提供的示例答案来问我的问题:

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{Row, Column}

object DummyUDAF extends UserDefinedAggregateFunction {
  def inputSchema = new StructType().add("x", StringType)
  def bufferSchema = new StructType()
    .add("buff", ArrayType(LongType))
    .add("buff2", ArrayType(DoubleType))
  def dataType = new StructType()
    .add("xs", ArrayType(LongType))
    .add("ys", ArrayType(DoubleType))
  def deterministic = true 
  def initialize(buffer: MutableAggregationBuffer) = {}
  def update(buffer: MutableAggregationBuffer, input: Row) = {}
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {}
  def evaluate(buffer: Row) = (Array(1L, 2L, 3L), Array(1.0, 2.0, 3.0))
}

我可以轻松地返回多个Maps 而不是Array,但不能在update 方法中改变地图。

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{Row, Column}

import scala.collection.mutable.Map

object DummyUDAF extends UserDefinedAggregateFunction {
  def inputSchema = new StructType().add("x", DoubleType).add("y", IntegerType)
  def bufferSchema = new StructType()
    .add("buff", MapType(DoubleType, IntegerType))
    .add("buff2", MapType(DoubleType, IntegerType))

  def dataType = new StructType()
    .add("xs", MapType(DoubleType, IntegerType))
    .add("ys", MapType(DoubleType, IntegerType))

  def deterministic = true 

  def initialize(buffer: MutableAggregationBuffer) = {
    buffer(0) = scala.collection.mutable.Map[Double,Int]()
    buffer(1) = scala.collection.mutable.Map[Double,Int]()
  }

  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0).asInstanceOf[Map[Double,Int]](input.getDouble(0)) = input.getInt(1)
    buffer(1).asInstanceOf[Map[Double,Int]](input.getDouble(0)*10) = input.getInt(1)*10
  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
    buffer1(0).asInstanceOf[Map[Double,Int]] ++= buffer2(0).asInstanceOf[Map[Double,Int]]
    buffer1(1).asInstanceOf[Map[Double,Int]] ++= buffer2(1).asInstanceOf[Map[Double,Int]]
  }

  //def evaluate(buffer: Row) = (Map(1.0->10,2.0->20), Map(10.0->100,11.0->110))
  def evaluate(buffer: Row) = (buffer(0).asInstanceOf[Map[Double,Int]], buffer(1).asInstanceOf[Map[Double,Int]])
}

这编译得很好,但会出现运行时错误:

val df = Seq((1.0, 1), (2.0, 2)).toDF("k", "v")
df.select(DummyUDAF($"k", $"v")).show(1, false)

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 70.0 failed 4 times, most recent failure: Lost task 1.3 in stage 70.0 (TID 204, 10.91.252.25): java.lang.ClassCastException: scala.collection.immutable.Map$EmptyMap$ cannot be cast to scala.collection.mutable.Map

here 讨论的另一个解决方案表明这可能是由于MapType StructType 而导致的问题。但是,当我尝试提到的解决方案时,我仍然遇到同样的错误。

val distudaf = new DistinctValues
val df = Seq(("a", "a1"), ("a", "a1"), ("a", "a2"), ("b", "b1"), ("b", "b2"), ("b", "b3"), ("b", "b1"), ("b", "b1")).toDF("col1", "col2")

df.groupBy("col1").agg(distudaf($"col2").as("DV")).show

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 22.0 failed 4 times, most recent failure: Lost task 1.3 in stage 22.0 (TID 100, 10.91.252.25): java.lang.ClassCastException: scala.collection.immutable.Map$EmptyMap$ cannot be cast to scala.collection.mutable.Map

鉴于我希望 Map 很大,并且复制和重新分配可能会导致性能/内存瓶颈,我的偏好是改变 Map)

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    我对 UDAF 的有限理解是,您应该只设置您想要(语义上)更新的内容,即采用 MutableAggregationBuffer 中已经设置的内容,结合您想要添加的内容和...= 它(它会在后台调用update(i: Int, value: Any): Unit

    您的代码可能如下所示:

    def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
      val newBuffer0 = buffer(0).asInstanceOf[Map[Double, Int]]
      buffer(0) = newBuffer0 + (input.getDouble(0) -> input.getInt(1))
    
      val newBuffer1 = buffer(1).asInstanceOf[Map[Double, Int]]
      buffer(1) = newBuffer1 + (input.getDouble(0) * 10 -> input.getInt(1) * 10)
    }
    

    完整的DummyUDAF可能如下:

    import org.apache.spark.sql.expressions._
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions.udf
    import org.apache.spark.sql.{Row, Column}
    
    object DummyUDAF extends UserDefinedAggregateFunction {
      def inputSchema = new StructType().add("x", DoubleType).add("y", IntegerType)
      def bufferSchema = new StructType()
        .add("buff", MapType(DoubleType, IntegerType))
        .add("buff2", MapType(DoubleType, IntegerType))
    
      def dataType = new StructType()
        .add("xs", MapType(DoubleType, IntegerType))
        .add("ys", MapType(DoubleType, IntegerType))
    
      def deterministic = true 
    
      def initialize(buffer: MutableAggregationBuffer) = {
        buffer(0) = Map[Double,Int]()
        buffer(1) = Map[Double,Int]()
      }
    
      def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        val newBuffer0 = buffer(0).asInstanceOf[Map[Double, Int]]
        buffer(0) = newBuffer0 + (input.getDouble(0) -> input.getInt(1))
    
        val newBuffer1 = buffer(1).asInstanceOf[Map[Double, Int]]
        buffer(1) = newBuffer1 + (input.getDouble(0) * 10 -> input.getInt(1) * 10)
      }
    
      def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
        buffer1(0) = buffer1(0).asInstanceOf[Map[Double,Int]] ++ buffer2(0).asInstanceOf[Map[Double,Int]]
        buffer1(1) = buffer1(1).asInstanceOf[Map[Double,Int]] ++ buffer2(1).asInstanceOf[Map[Double,Int]]
      }
    
      //def evaluate(buffer: Row) = (Map(1.0->10,2.0->20), Map(10.0->100,11.0->110))
      def evaluate(buffer: Row) = (buffer(0).asInstanceOf[Map[Double,Int]], buffer(1).asInstanceOf[Map[Double,Int]])
    }
    

    【讨论】:

    • 值得指出的是,这会在每次调用 update 时复制两次数据,因此并不能真正解决问题。
    【解决方案2】:

    聚会迟到了。我刚刚发现可以使用

    override def bufferSchema: StructType = StructType(List(
        StructField("map", ObjectType(classOf[mutable.Map[String, Long]]))
    ))
    

    在缓冲区中使用mutable.Map

    【讨论】:

    • 你能发布一个完整的例子吗?我做了import scala.collection.mutable 并尝试了您的解决方案。但是,在 runtime 出现错误:org.apache.spark.SparkException: Unsupported data type ObjectType(interface scala.collection.mutable.Map)
    猜你喜欢
    • 2016-04-20
    • 1970-01-01
    • 1970-01-01
    • 2019-05-23
    • 1970-01-01
    • 2022-08-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多