【问题标题】:Should I choose RDD over DataSet/DataFrame if I intend to perform a lot of aggregations by key?如果我打算按键执行大量聚合,我应该选择 RDD 而不是 DataSet/DataFrame 吗?
【发布时间】:2019-07-07 07:46:40
【问题描述】:

我有一个用例,我打算在聚合列时按键分组。我正在使用数据集并尝试通过使用groupByagg 来实现这些操作。比如下面这个场景

case class Result(deptId:String,locations:Seq[String])
case class Department(deptId:String,location:String)

// using spark 2.0.2
// I have a Dataset `ds` of type Department   

+-------+--------------------+
|deptId |      location      |
+-------+--------------------+
|     d1|delhi               |            
|     d1|mumbai              |
|    dp2|calcutta            |
|    dp2|hyderabad           |       
+-------+--------------------+

我打算把它转换成

// Dataset `result` of type Result

+-------+--------------------+
|deptId |      locations     |
+-------+--------------------+
|     d1|[delhi,mumbai]      |            
|    dp2|[calcutta,hyderabad]|            
+-------+--------------------+

为此,我在堆栈上进行了搜索,发现以下内容:

val flatten = udf(
  (xs: Seq[Seq[String]]) => xs.flatten)

val result = ds.groupBy("deptId").
                agg(flatten(collect_list("location")).as("locations")

以上对我来说似乎很整洁。

  1. 但在搜索上述内容之前,我首先搜索了 Dataset 是否像 RDD 一样具有内置的reduceByKey。但是找不到,所以选择了上面。但我读了这篇文章grouByKey vs reduceByKey 并了解到reduceByKey 的洗牌更少,效率更高。这是我提出这个问题的第一个原因,我应该在我的场景中选择 RDD 吗?
  2. 我最初选择 Dataset 的原因仅仅是强制执行类型,即。每行的类型为Department。但是由于我的结果有一个完全不同的模式,我应该为类型安全而烦恼吗?所以我尝试做result.as[Result],但这似乎没有做任何编译时类型检查。我选择 Dataset 的另一个原因是,我会将结果 Dataset 传递给其他函数,具有结构使代码易于维护。此外,case 类可以高度嵌套,我无法想象在编写 reduce/map 操作时在 pairRDD 中保持这种嵌套。
  3. 我不确定的另一件事是关于使用udf。我遇到了post,人们说他们更愿意将 Dataset 更改为 RDD,而不是使用 udf 进行复杂的聚合/grouby。
  4. 我也用谷歌搜索了一下,看到有人说 Dataset 有类型检查开销的帖子/文章,但与 RDD 相比,更高的 spark 版本在性能方面更好。再次不确定我应该切换回 RDD 吗?

PS:如果我用错了一些术语,请见谅。

【问题讨论】:

    标签: scala apache-spark dataset rdd user-defined-functions


    【解决方案1】:

    回答你们的一些问题:

    • groupBy + agg 不是 groupByKey - DataFrame / Dataset groupBy behaviour/optimization - 一般情况下。在某些特定情况下,它的行为可能类似,这包括 collect_list
    • reduceByKey 并不比 RDD 风格的 groupByKey 好,当需要 groupByKey 类似的逻辑时 - Be Smart About groupByKey - 事实上它几乎总是更糟。

      李>
    • 在 Spark 的 Dataset - Spark 2.0 Dataset vs DataFrame 中,静态类型检查和性能之间有一个重要的权衡

    • The linked post 特别建议不要使用UserDefinedAggregateFunction(不是UserDefinedFunction),因为过度复制数据 - Spark UDAF with ArrayType as bufferSchema performance issues

    • 您甚至不需要UserDefinedFunction,因为您的情况不需要展平:

      val df = Seq[Department]().toDF
      
      df.groupBy("deptId").agg(collect_list("location").as("locations"))
      

      这就是你应该追求的目标

      静态类型的等价物是

      val ds = Seq[Department]().toDS
      
      ds
        .groupByKey(_.deptId)
        .mapGroups { case (deptId, xs) => Result(deptId, xs.map(_.location).toSeq) }
      

      DataFrame 选项贵很多。

    【讨论】:

    • 非常感谢。非常感谢链接帖子的详尽列表:)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-05-17
    • 2011-06-21
    • 1970-01-01
    • 2011-09-13
    • 2012-01-22
    • 2023-03-09
    相关资源
    最近更新 更多