【问题标题】:Need sth like "def groupByKeyWithRDD(partitioner: Partitioner): RDD[(K, RDD[V])] = ???"需要像“def groupByKeyWithRDD(partitioner: Partitioner): RDD[(K, RDD[V])] = ???”
【发布时间】:2015-11-14 19:03:00
【问题描述】:

在我们使用 groupByKey(...): RDD[(K, Iterable[V]] 的用例中,即使对于单个键(尽管是极端情况),关联的 Iterable[ V] 可能导致OOM。

是否可以提供上述'groupByKeyWithRDD'?

而且,理想情况下,如果 RDD[V] 的内部 impl 足够聪明,只在配置的阈值时将数据溢出到磁盘中,那就太好了。这样,我们也不会牺牲正常情况下的性能。

欢迎任何建议/cmets。非常感谢!

顺便说一句:我们确实理解这里提到的几点:https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html,而 'reduceByKey'、'foldByKey' 目前不太适合我们的需求,也就是说,我们无法真正避免'groupByKey'。

【问题讨论】:

  • 我不太明白你的问题!
  • 不幸的是,嵌套的 RDD 是不可能的。 this thread 上的最后一篇文章很好地解释了原因。我认为没有办法可以解决您的情况(或者一般来说,在运行 *ByKey 操作时出现严重的键偏差)。如果您能找到某种方式在逻辑上更均匀地分配密钥,那显然会有所帮助。
  • 通常,我所做的是将两个 RDD 与一个相似的“键”相关联,然后对该键执行连接操作。这是一项非常昂贵的操作,但它仍然是一个很好的解决方法。
  • @RohanAletty 感谢您的评论。不一定是嵌套的 RDD,RDD[(K, Stream[V])] 也可以,只要值是惰性集合即可。

标签: apache-spark rdd


【解决方案1】:

假设#(of-unique-keys) RDD[(K, RDD[V])]。相反,您可以通过使用过滤器映射唯一键来转换为Map[(K, RDD[V])]

import scala.reflect.ClassTag
import org.apache.spark.rdd.RDD

def splitByKey[K : ClassTag, V: ClassTag](rdd: RDD[(K, V)]): Map[K, RDD[V]] = {
  val keys = rdd.keys.distinct.collect.toSeq
  keys.map(key => (key -> rdd.filter{case (k, _) => k == key}.values)).toMap
}

它需要对数据进行多次扫描,因此它并不便宜,但不需要洗牌,可以更好地控制缓存,并且只要初始 RDD 适合内存,就不太可能导致 OOM。

【讨论】:

    猜你喜欢
    • 2018-11-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-10-16
    • 2014-11-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多