【问题标题】:Trouble getting Spark aggregators to work使 Spark 聚合器无法正常工作
【发布时间】:2018-09-01 14:31:54
【问题描述】:

我想在 Scala Spark 中试用聚合器,但我似乎无法让它们同时使用 select 函数和 groupBy/agg 函数工作(在我当前的实现中,agg 函数无法编译) .我的聚合器写在下面,应该是不言自明的。

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders}

/** Stores the number of true counts (tc) and false counts (fc) */
case class Counts(var tc: Long, var fc: Long)

/** Count the number of true and false occurances of a function */
class BooleanCounter[A](f: A => Boolean) extends Aggregator[A, Counts, Counts] with Serializable {
  // Initialize both counts to zero
  def zero: Counts = Counts(0L, 0L) 
  // Sum counts for intermediate value and new value
  def reduce(acc: Counts, other: A): Counts = { 
    if (f(other)) acc.tc += 1 else acc.fc += 1
    acc 
  }
  // Sum counts for intermediate values
  def merge(acc1: Counts, acc2: Counts): Counts = { 
    acc1.tc += acc2.tc
    acc1.fc += acc2.fc
    acc1
  }
  // Return results
  def finish(acc: Counts): Counts = acc 
  // Encoder for intermediate value type
  def bufferEncoder: Encoder[Counts] = Encoders.product[Counts]
  // Encoder for return type
  def outputEncoder: Encoder[Counts] = Encoders.product[Counts]
}

下面是我的测试代码。

val ds: Dataset[Employee] = Seq(
  Employee("John", 110),
  Employee("Paul", 100),
  Employee("George", 0), 
  Employee("Ringo", 80) 
).toDS()

val salaryCounter = new BooleanCounter[Employee]((r: Employee) => r.salary < 10).toColumn
// Usage works fine 
ds.select(salaryCounter).show()
// Causes an error
ds.groupBy($"name").agg(salaryCounter).show()

salaryCounter 的第一次使用工作正常,但第二次使用会导致以下编译错误。

java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to Employee 

Databricks 有一个相当复杂的tutorial,但似乎是 Spark 2.3。还有 this 旧教程使用 Spark 1.6 中的实验性功能。

【问题讨论】:

  • 操作:ds.groupBy($"name").agg(salaryCounter).show() 有什么意义?早期聚合的输出只返回一个包含 2 列 (tc, fc) 和 1 行的数据集。您希望通过此操作获得什么输出?应用于ds.groupBy($"name") 时的UDAF 显然不起作用,因为在这种情况下提供给UDAF 的输入不是Employee
  • 理想情况下,这将生成一个数据集,其中每一行对应一个名称,每个名称的列 (tc, fc) 对应于具有给定名称的员工数量分别低于或高于 10 美元.如您所说,UDAF 并未应用于ds.groupBy($"name"),而是传递给.agg 函数。请参阅我链接的教程,因为它们具有似乎有效的示例用法。

标签: scala apache-spark apache-spark-sql aggregate-functions user-defined-functions


【解决方案1】:

您错误地混合了“静态类型”和“动态类型”API。要使用以前的版本,您应该在KeyValueGroupedDataset 上调用agg,而不是RelationalGroupedDataset

ds.groupByKey(_.name).agg(salaryCounter)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-01-01
    • 1970-01-01
    • 2014-04-04
    • 2021-01-25
    • 2014-06-08
    • 2015-07-19
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多