【问题标题】:Efficient PairRDD operations on DataFrame with Spark SQL GROUP BY使用 Spark SQL GROUP BY 对 DataFrame 进行高效的 PairRDD 操作
【发布时间】:2015-08-03 06:27:43
【问题描述】:

这个问题是关于聚合操作时DataFrameRDD 之间的对偶性。在 Spark SQL 中,可以使用表生成 UDF 进行自定义聚合,但创建其中之一的用户友好性通常明显低于使用 RDD 可用的聚合函数,尤其是在不需要表输出的情况下。

是否有一种有效的方法可以将 aggregateByKey 等配对 RDD 操作应用于已使用 GROUP BY 分组或使用 ORDERED BY 排序的 DataFrame?

通常,需要一个显式的map 步骤来创建键值元组,例如dataFrame.rdd.map(row => (row.getString(row.fieldIndex("category")), row).aggregateByKey(...)。这可以避免吗?

【问题讨论】:

    标签: scala apache-spark apache-spark-sql rdd


    【解决方案1】:

    不是真的。虽然DataFrames 可以转换为RDDs,反之亦然,但这是相对复杂的操作,DataFrame.groupBy 之类的方法与RDD 上的对应方法的语义不同。

    你能得到的最接近的东西是在 Spark 1.6.0 中引入的a new DataSet API。它提供了与DataFramesGroupedDataset 类的更紧密集成,并具有自己的一组方法,包括reducecogroupmapGroups

    case class Record(id: Long, key: String, value: Double)
    
    val df = sc.parallelize(Seq(
        (1L, "foo", 3.0), (2L, "bar", 5.6),
        (3L, "foo", -1.0), (4L, "bar", 10.0)
    )).toDF("id", "key", "value")
    
    val ds = df.as[Record]
    ds.groupBy($"key").reduce((x, y) => if (x.id < y.id) x else y).show
    
    // +-----+-----------+
    // |   _1|         _2|
    // +-----+-----------+
    // |[bar]|[2,bar,5.6]|
    // |[foo]|[1,foo,3.0]|
    // +-----+-----------+
    

    在某些特定情况下,可以利用Orderable 语义对使用structsarrays 的数据进行分组和处理。你会在SPARK DataFrame: select the first row of each group中找到一个例子

    【讨论】:

    • 是的,数据集看起来确实很有趣,但 Spark 1.6.0 中的支持仍然有很多缺陷:它们是一个实验性功能。
    • 它是 :) 矛盾的是,Spark DataFrames 在 PySpark 上的表现要比 Scala 好得多。不幸的是,JVM 和 Python 之间的跳跃使事情变得非常昂贵。
    • @zero323 ,一直在测试您的示例,但收到错误 error: value reduce is not a member of org.apache.spark.sql.RelationalGroupedDataset 。我错过了进口吗? (我设法找到的只有 reduce 与 RDD 相关)
    猜你喜欢
    • 2012-06-30
    • 2017-09-12
    • 1970-01-01
    • 1970-01-01
    • 2021-08-10
    • 2021-02-23
    • 2021-11-20
    • 2021-04-25
    • 1970-01-01
    相关资源
    最近更新 更多