【问题标题】:UDAF in Spark with multiple input columns具有多个输入列的 Spark 中的 UDAF
【发布时间】:2016-05-12 16:57:33
【问题描述】:

我正在尝试开发一个用户定义的聚合函数,该函数计算一行数字的线性回归。我已经成功完成了计算均值置信区间的 UDAF(经过大量试验和错误以及 SO!)。

这就是我已经实际运行的内容:

import org.apache.spark.sql._
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{StructType, StructField, DoubleType, LongType, DataType, ArrayType}

case class RegressionData(intercept: Double, slope: Double)

class Regression  {

  import org.apache.commons.math3.stat.regression.SimpleRegression

  def roundAt(p: Int)(n: Double): Double = { val s = math pow (10, p); (math round n * s) / s }

  def getRegression(data: List[Long]): RegressionData = {
    val regression: SimpleRegression  = new SimpleRegression()
    data.view.zipWithIndex.foreach { d =>
        regression.addData(d._2.toDouble, d._1.toDouble)
    }

    RegressionData(roundAt(3)(regression.getIntercept()), roundAt(3)(regression.getSlope()))
  }
}


class UDAFRegression extends UserDefinedAggregateFunction {

  import java.util.ArrayList

  def deterministic = true

  def inputSchema: StructType =
    new StructType().add("units", LongType)

  def bufferSchema: StructType =
    new StructType().add("buff", ArrayType(LongType))


  def dataType: DataType =
    new StructType()
      .add("intercept", DoubleType)
      .add("slope", DoubleType)

  def initialize(buffer: MutableAggregationBuffer) = {
    buffer.update(0, new ArrayList[Long]())
  }

  def update(buffer: MutableAggregationBuffer, input: Row) = {
    val longList: ArrayList[Long]  = new ArrayList[Long](buffer.getList(0))
    longList.add(input.getLong(0));
    buffer.update(0, longList);

  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
    val longList: ArrayList[Long] = new ArrayList[Long](buffer1.getList(0))
    longList.addAll(buffer2.getList(0))

    buffer1.update(0, longList)
  }


  def evaluate(buffer: Row) = {
    import scala.collection.JavaConverters._
    val list = buffer.getList(0).asScala.toList
    val regression = new Regression
    regression.getRegression(list)
  }
}

但是数据集不是按顺序排列的,这在这里显然非常重要。因此,我需要第二个参数 regression($"longValue", $"created_day") 而不是 regression($"longValue")created_daysql.types.DateType

我对 DataTypes、StructTypes 和诸如此类的东西感到很困惑,并且由于网络上缺乏示例,我在这里的试用和订购尝试被卡住了。

我的bufferSchema 会是什么样子?

在我的情况下,这些 StructTypes 是开销吗? (可变的)Map 不会做吗? MapType 实际上是不可变的吗?作为缓冲区类型,这不是毫无意义吗?

我的inputSchema 会是什么样子?

这是否必须与我在update() 中通过input.getLong(0) 检索到的类型相匹配?

有没有标准的方法来重置initialize()中的缓冲区

我见过buffer.update(0, 0.0)(显然,它包含双打),buffer(0) = new WhatEver(),我认为甚至是buffer = Nil。这些有什么不同吗?

如何更新数据?

上面的例子似乎过于复杂。我期待能够做某事。喜欢buffer += input.getLong(0) -> input.getDate(1)。 我可以期望以这种方式访问​​输入吗

如何合并数据?

我可以把功能块留空吗 def merge(…) = {}?

evaluate() 中对缓冲区进行排序的挑战是……。我应该能够弄清楚,尽管我仍然对你们如何做到这一点的最优雅的方式感兴趣(在很短的时间内)。

额外问题:dataType 扮演什么角色?

我返回一个案例类,而不是 dataType 中定义的 StructType,这似乎不是问题。还是因为它恰好与我的案例类匹配而有效?

【问题讨论】:

    标签: scala apache-spark user-defined-functions


    【解决方案1】:

    也许这会解决问题。

    UDAF APIs 工作于 DataFrame Columns。您所做的一切都必须像DataFrame 中的所有其他Columns 一样序列化。正如您所注意到的,唯一的支持MapType 是不可变的,因为这是您可以放入Column 的唯一内容。使用不可变集合,您只需创建一个包含旧集合和一个值的新集合:

    var map = Map[Long,Long]()
    map = map + (0L -> 1234L)
    map = map + (1L -> 4567L)
    

    是的,就像使用任何DataFrame 一样,您的类型必须匹配。当真的有LongType 时,请执行buffer.getInt(0) 会有问题。

    没有重置缓冲区的标准方法,因为除了对您的数据类型/用例有意义的方法之外。也许零实际上是上个月的平衡;也许零是另一个数据集的运行平均值;也许零是null 或空字符串,或者零可能真的是零。

    merge 是一种仅在某些情况下发生的优化,如果我没记错的话——如果情况允许的话,SQL 优化可以使用的一种小计方式。我只是使用与update 相同的函数。

    case class 将自动转换为适当的架构,因此对于附加问题,答案是肯定的,因为架构匹配。把dataType改成不匹配,会报错。

    【讨论】:

    • 我会在再试一次时尝试使用它,也许它比我看起来更简单!
    • 我的理解是 merge 将当前缓冲区添加到另一个实例的输出行。如果输出行可以用作输入行,则只能将更新函数用于合并,它可以用于求和,但不能用于计数。列的含义、数量和类型必须匹配。
    猜你喜欢
    • 2017-08-02
    • 1970-01-01
    • 2023-03-21
    • 1970-01-01
    • 1970-01-01
    • 2016-03-14
    • 1970-01-01
    • 1970-01-01
    • 2019-03-06
    相关资源
    最近更新 更多