【问题标题】:Spark, applying filters on DataFrame(or RDD) multiple times without redundant evaluationsSpark,在 DataFrame(或 RDD)上多次应用过滤器,无需冗余评估
【发布时间】:2020-03-11 03:41:02
【问题描述】:

我有一个 Spark DataFrame,需要对父 RDD 的链接进行大量评估。

val df: DataFrame[(String, Any)] = someMethodCalculatingDF()
val out1 = df.filter(_._1 == "Key1").map(_._2).collect()
val out2 = df.filter(_._1 == "Key2").map(_._2)

out1 是一个非常小的数据(每个分区中只有一到两行)并收集起来以供进一步使用。 out2 是一个 Dataframe,将用于生成另一个 RDD,稍后将具体化。 因此,df 将被评估两次,这很重。

Caching 可能是一个解决方案,但在我的应用程序中,它不会是,因为数据可能真的非常大。内存会溢出。

有没有天才 :) 谁能提出另一种绕过冗余评估的方法?

【问题讨论】:

  • 你能在someMethodCalculatingDF之前做过滤吗?您仍然会运行该方法两次,但数据量会有所不同(计算 out1 时会少很多)。
  • 不.. 不幸的是.. T^T

标签: scala apache-spark lazy-evaluation


【解决方案1】:

这实际上是我们集群中每天都会发生的场景。根据我们的经验,这种方法最适合我们。

当我们需要两次(在不同的分支上)使用相同的计算数据帧时,我们执行以下操作:

  1. 计算阶段很繁重,导致数据帧相当小 -> 缓存它。

  2. 计算阶段很轻,导致数据框很大 -> 让它计算两次。

  3. 计算量大导致数据帧大 -> 将其写入磁盘(HDFS 或 S3)将拆分点上的作业拆分为两个不同的批处理作业。在这种情况下,您无需重复繁重的计算,也无需粉碎缓存(无论哪种方式都可能使用磁盘)。

  4. 计算阶段很轻,导致数据帧很小。你的生活很好,你可以回家了:)。

【讨论】:

    【解决方案2】:

    我不熟悉数据集 API,所以将使用 RDD api 编写解决方案。

    val rdd: RDD[(String, Int)] = ???
    
    //First way
    val both: Map[String, Iterable[Int]] = rdd.filter(e => e._1 == "Key1" || e._1 == "Key2")
      .groupByKey().collectAsMap()
    
    //Second way
    val smallCached = rdd.filter(e => e._1 == "Key1" || e._1 == "Key2").cache()
    val out1 = smallCached.filter(_._1 == "Key1").map(_._2).collect()
    val out2 = smallCached.filter(_._1 == "Key2").map(_._2).collect()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-01-20
      • 2018-11-01
      • 2016-05-26
      • 2017-02-02
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多