【问题标题】:Saving large file exceeds frameLimit保存大文件超出 frameLimit
【发布时间】:2016-08-17 18:40:14
【问题描述】:

我尝试保存一个大约为 1 的大文本文件。 5GB

sc.parallelize(cfile.toString()
  .split("\n"), 1)
  .saveAsTextFile(new Path(path+".cs", "data").toUri.toString)

但我不断得到

java.io.IOException: Broken pipe
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
    at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
    at sun.nio.ch.IOUtil.write(IOUtil.java:65)
...
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 6
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:542)
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:538)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)

我已经被困在这里很久了。谁能在这里帮助我并解释如何将cfile 保存为文本文件?


独立/本地/Yarn 集群?

  • 纱团

内存/核心设置?

  • 1.8 TB
  • 285 核

分区数?

  • 我目前正在设置分区数为1

设置分区数的相关代码行:

val model = word2vec
  .setMinCount(minCount.asInstanceOf[Int])
  .setVectorSize(arguments.getVectorSize)
  .setWindowSize(arguments.getContextWindowSize)
  .setNumPartitions(numW2vPartitions)
  .setLearningRate(learningRate)
  .setNumIterations(arguments.getNumIterations)
  .fit(wordSequence)

spark-submit 参数:

spark-submit --master yarn 
             --deploy-mode cluster 
             --driver-memory 20G 
             --num-executors 5 
             --executor-cores 8 
             --driver-java-options "-Dspark.akka.frameSize=2000" 
             --executor-memory 20G --class

【问题讨论】:

    标签: apache-spark word2vec


    【解决方案1】:

    独立/本地/纱线集群? 内存/核心设置? 分区数?

    您的错误可能表明其中一名工人已经离开(OOM 杀手可能已经杀死了它或者它出现了一些 OOM 错误)

    我不确定你为什么要这样做:cfile.toString().split("\n") - 从这里我了解到你将所有 5GB 内容保存在内存中并尝试并行化它?显然它不是最优的。 另一个可能相关的问题 - 如果您的驱动程序可以以某种方式将所有 5GB 存储在内存中,但驱动程序工作人员之间的所有网络层仍然不喜欢这么多的数据量 - 所以我的建议是将其分成多个分区。

    相反,您可以使用 sc.textFile(..) 读取文件,然后将其保存到新路径中。您还可以使用 sc.textFile(..).repartition(100) 控制文本文件的分区数。

    【讨论】:

    • 实际上我正在尝试保存一个词向量模型,有一个 spark versionsave() 方法根本不起作用,我不知道为什么。我联系了用户组,但此后没有得到任何答复。这就是为什么我认为我可以简单地将我的模型保存为 CSV 文件并规避看似损坏的实现。我会用您要求的信息更新我的问题。
    • 驱动程序实际上有 20GB 的 RAM,对于我目前使用的 8 个工作人员来说都是一样的。
    • 那么您是如何将模型转换为 cfile 的呢?我的意思是假设模型是某种 rdd 并且您想将其保存为文本,因此您可以将此 rdd 转换为并行字符串的 rdd(而不像您可能那样将其全部带到驱动程序),然后将其保存到 hdfs也平行
    • 嗯.. spark 返回一个Map[String, Array[Float]],它是从一个词到它的词向量的映射。 Word2Vec#SaveLoadV1_0#save() 应该为我处理这个问题,但它要么坏了,要么我根本不明白我必须做什么才能完成这项工作。因为它没有,所以我只是迭代地图并将所有 (word, word-vector) 对转换为 C 行。那最终是一个Seq[String],我想将其存储在 one 大文件中。原因是我将在另一个库中使用该模型文件,因为 spark w2v 目前没有 doc2vec 实现。
    • 我不得不承认我对 Spark(Scala 也是如此)相当陌生,这可能只是我缺乏一些基本的理解或知识。我仍然不明白为什么一切正常,除非我尝试保存那个需要 24 小时才能训练的模型..
    猜你喜欢
    • 1970-01-01
    • 2014-12-03
    • 2020-11-07
    • 1970-01-01
    • 2021-04-23
    • 1970-01-01
    • 2016-09-24
    • 1970-01-01
    • 2017-01-24
    相关资源
    最近更新 更多