【问题标题】:groupBy cannot handle large RDDsgroupBy 无法处理大型 RDD
【发布时间】:2017-08-19 21:07:12
【问题描述】:

代码如下:

val words = sc.textFile("/Users/kaiyin/IdeaProjects/learnSpark/src/main/resources/eng_words.txt" )
words.take(1000000).foreach(println _)
words.take(150000).groupBy((x: String) => x.head).map {
  case (c, iter)  => (c, iter.toList.size)
}.foreach {
  println _
}

eng_words.txt 是一个包含大约 100 万个英文单词的文本文件,每行一个。一旦 RDD 超过 150000,groupBy 将崩溃并出现此错误:

java.util.NoSuchElementException: next on empty iterator
  at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
  at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
  at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
  at scala.collection.IterableLike$class.head(IterableLike.scala:107)
  at scala.collection.immutable.StringOps.scala$collection$IndexedSeqOptimized$$super$head(StringOps.scala:30)
  at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126)
  at scala.collection.immutable.StringOps.head(StringOps.scala:30)
  at $anon$1$$anonfun$run$1.apply(<console>:23)
  at $anon$1$$anonfun$run$1.apply(<console>:23)
  at scala.collection.TraversableLike$$anonfun$groupBy$1.apply(TraversableLike.scala:332)
  at scala.collection.TraversableLike$$anonfun$groupBy$1.apply(TraversableLike.scala:331)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.groupBy(TraversableLike.scala:331)
  at scala.collection.mutable.ArrayOps$ofRef.groupBy(ArrayOps.scala:186)
  at $anon$1.run(<console>:23)
  at Helper.HasRun$class.newRun(HasRun.scala:21)
  at $anon$1.newRun(<console>:19)
  ... 55 elided

出了什么问题?

【问题讨论】:

    标签: scala apache-spark rdd


    【解决方案1】:

    在这种特殊情况下,它很可能无法处理空字符串。尽管如此,不要groupBy,不要打电话给toList,也不要盲目相信输入格式正确。

    • head 将在空行上失败并显示您看到的错误

    • groupBygroupByKey 相同,要求每个键的所有记录都适合执行程序内存。

    这里还有另一个字数:

    words
      // Make sure that it won't fail on empty string with
      // java.util.NoSuchElementException: next on empty iterator
      .flatMap(_.headOption) 
      // Map to pairs and reduce to avoid excessive shuffling and limit memory usage
      .map((_, 1))
      .reduceByKey(_ + _)
    

    【讨论】:

    • 等等,对吗?我认为如果无法访问足够的内存,执行程序可以无缝地将他们需要的内容溢出到磁盘上。当然,这会带来很大的放缓,但它应该仍然有效。无论如何,reduceByKey 更好,就像您建议的那样,因为它允许执行程序在执行其余操作之前执行聚合。
    • @JeffL。你在这里是部分正确的。如果需要,无论如何都可以溢出数据,但不是单个键。你得到的值只是 ArrayBuffer 的一个变体,因此必须适合内存。
    • 这很有趣,谢谢。当你深入了解这样的细节时,有时很难找到关于 Spark 在幕后做了什么的好信息。此外,我应该认为,您分配给集群执行程序的资源可以缓解其中的一些问题。这似乎很重要,因为reduceByKey 没有等效的数据框,如果我没记错的话。
    • @JeffL。略有不同。 Data(set|Frame).groupBy 是一种逻辑操作,而不是物理操作。对于 DF,您可以查看stackoverflow.com/q/32902982/1560062。关于 RDD,它几乎(不)出名avoid groupByKey
    猜你喜欢
    • 2018-10-04
    • 2015-09-09
    • 2019-06-18
    • 1970-01-01
    • 2015-04-10
    • 2011-07-28
    • 1970-01-01
    • 1970-01-01
    • 2017-03-30
    相关资源
    最近更新 更多