【问题标题】:How to use GroupByKey in Spark to calculate nonlinear-groupBy task如何在 Spark 中使用 GroupByKey 计算非线性 groupBy 任务
【发布时间】:2019-01-12 21:33:01
【问题描述】:

我有一张桌子看起来像

Time  ID  Value1  Value2 
 1    a     1      4
 2    a     2      3
 3    a     5      9
 1    b     6      2
 2    b     4      2 
 3    b     9      1
 4    b     2      5  
 1    c     4      7 
 2    c     2      0

以下是任务和要求:

  1. 我想将列ID设置为键,而不是时间列,但我不想删除时间列。 Spark中有设置主键的方法吗?

  2. 聚合函数是非线性的,这意味着你不能使用“reduceByKey”。在计算之前,所有数据必须洗牌到一个节点。例如,聚合函数可能看起来像求和值的根 N,其中 N 是每个 ID 的记录数(计数):

    output = root(sum(value1), count(*)) + root(sum(value2), count(*)) 
    

为了明确,对于ID="a",聚合输出值应该是

 output = root(1 + 2 + 5, 3) + root(4 + 3 + 9, 3)    

后面的 3 是因为我们有 3 条记录。对于 ID='b',它是:

 output = root(6 + 4 + 9 + 2, 4) + root(2 + 2 + 1 + 5, 4) 

组合是非线性的。因此,为了得到正确的结果,所有具有相同“ID”的数据必须在一个执行器中。

我在 Spark 2.0 中检查了 UDF 或聚合器。根据我的理解,他们都假设“线性组合”

有没有办法处理这种非线性组合计算?尤其是利用 Spark 的并行计算优势?

【问题讨论】:

  • 你可以做一个reduceByKey,产生总和和值的计数,然后对它做root([sum], [count]),不是吗?

标签: scala apache-spark apache-spark-sql aggregation


【解决方案1】:

您使用的功能不需要任何特殊处理。您可以将纯 SQL 与 join 一起使用

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.{count, lit, sum, pow}

def root(l: Column, r: Column) = pow(l, lit(1) / r)

val out = root(sum($"value1"), count("*")) + root(sum($"value2"), count("*"))

df.groupBy("id").agg(out.alias("outcome")).join(df, Seq("id"))

或窗口函数:

import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy("id")
val outw = root(sum($"value1").over(w), count("*").over(w)) + 
           root(sum($"value2").over(w), count("*").over(w))

df.withColumn("outcome", outw)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-01-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-11-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多