【问题标题】:Spark DataFrame Aggregation based on two or more Columns基于两列或多列的 Spark DataFrame 聚合
【发布时间】:2017-09-19 22:01:18
【问题描述】:

我想为一些基于多列的自定义聚合编写一个 UDAF。一个简单的示例是具有两列 c1 和 c2 的数据框。对于每一行,我取 c1 和 c2 的最大值(我们称之为 cmax),然后我取 cmax 的总和。

当我调用 df.agg() 时,我似乎无法将两列或更多列传递给包括 UDAF 在内的任何聚合方法。第一个问题,是真的吗?

对于这个简单的示例,我可以创建另一个名为 cmax 的列,并在 cmax 上进行聚合。但实际上,我需要基于 N 个列组合进行聚合,结果将是一个大小为 N 的集合。我想在我的 UDAF 的更新方法中循环组合。因此它需要 N 个中间列,这对我来说似乎不是一个干净的解决方案。第二个问题,我想知道创建中间列是否是这样做的方法,或者是否有更好的解决方案。

我注意到在 RDD 中,问题要容易得多。我可以将整个记录传递给我的聚合函数,并且我可以访问所有数据字段。

【问题讨论】:

    标签: apache-spark apache-spark-sql spark-dataframe


    【解决方案1】:

    您可以在 UDAF 中使用尽可能多的列,因为它的 apply 函数的签名接受多个 Columns(来自它的源代码)。

     def apply(exprs: Column*): Column
    

    您只需要确保 inputSchema 返回一个 StructType 反映您想要作为 UDAF 输入使用的列。

    对于 c1c2 列,您的 UDAF 必须使用以下架构实现 inputSchema

    def inputSchema: StructType = StructType(Array(StructField("c1", DoubleType), StructField("c2", DoubleType)))
    

    但是,如果您想要更通用的解决方案,您始终可以使用允许返回正确 inputSchema 的参数初始化自定义 UDAF。请参阅下面的示例,该示例允许在构造时定义任意 StructType注意,我们不会验证 StructType 是否属于 DoubleType)。

    class MyMaxUDAF(schema: StructType) extends UserDefinedAggregateFunction {
    
      def inputSchema: StructType = this.schema
    
      def bufferSchema: StructType = StructType(Array(StructField("maxSum", DoubleType)))
    
      def dataType: DataType = DoubleType
    
      def deterministic: Boolean = true
    
      def initialize(buffer: MutableAggregationBuffer): Unit = buffer(0) = 0.0
    
      def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = buffer.getDouble(0) + Array.range(0, input.length).map(input.getDouble).max
      }
    
      def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = buffer2 match {
        case Row(buffer2Sum: Double) => buffer1(0) = buffer1.getDouble(0) + buffer2Sum
      }
    
      def evaluate(buffer: Row): Double = buffer match {
        case Row(totalSum: Double) => totalSum
      }
    
    }
    

    您的 DataFrame 包含用于聚合的值和键。

    val df = spark.createDataFrame(Seq(
      Entry(0, 1.0, 2.0, 3.0), Entry(0, 3.0, 1.0, 2.0), Entry(1, 6.0, 2.0, 2)
    ))
    df.show
    
    
    +-------+---+---+---+
    |groupMe| c1| c2| c3|
    +-------+---+---+---+
    |      0|1.0|2.0|3.0|
    |      0|3.0|1.0|2.0|
    |      1|6.0|2.0|2.0|
    +-------+---+---+---+
    

    使用 UDAF,我们预计 max 的总和为 6.0 和 6.0

    val fields = Array("c1", "c2", "c3")
    val struct = StructType(fields.map(StructField(_, DoubleType)))
    val myMaxUDAF: MyMaxUDAF = new MyMaxUDAF(struct)
    df.groupBy("groupMe").agg(myMaxUDAF(fields.map(df(_)):_*)).show
    
    
    +-------+---------------------+
    |groupMe|mymaxudaf(c1, c2, c3)|
    +-------+---------------------+
    |      0|                  6.0|
    |      1|                  6.0|
    +-------+---------------------+
    

    有一个很好的关于 UDAF 的教程。不幸的是,它们没有涵盖多个论点。

    https://ragrawal.wordpress.com/2015/11/03/spark-custom-udaf-example/

    【讨论】:

    • 不确定 22 个参数的限制。你在哪里找到的?
    • Scala 每个函数最多有 22 个参数。不确定,但 UDAF 的签名将每列读取为单个参数,而不是列列表。需要验证,但现在我在打电话。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-09-28
    • 2015-05-17
    • 1970-01-01
    • 2015-08-11
    • 2021-11-15
    • 2018-10-27
    相关资源
    最近更新 更多