【问题标题】:Pyspark - MetadataFetchFailedException when calculating tf - idfPyspark - 计算 tf 时出现 MetadataFetchFailedException - idf
【发布时间】:2021-12-15 17:38:55
【问题描述】:

我正在处理最初 569 MB 的数据集,计算 TF-IDF 指标。虽然我最终得到了结果,但我不断收到以下错误:

WARN scheduler.TaskSetManager: Lost task 13.0 in stage 11.0 (TID 84, X.X.X.X, executor 0): FetchFailed(null, shuffleId=4, mapId=-1, reduceId=4, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 4
    at org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:882)
    at org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:878)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:878)
    at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:691)
    at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:103)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

我已经阅读了相关帖子,并且已经更改了一些火花属性,如下所示

spark=SparkSession.builder.appName("part_2_task_2").config('spark.executor.memory','2g').config('spark.executor.memoryOverhead','1g').config('spark.shuffle.io.maxRetries',5).config('spark.shuffle.io.retryWait','30s').config('spark.network.timeout','200s').getOrCreate()

所以目前我有以下集群详细信息:

spark.executor.cores    2
spark.executor.instances    2
spark.executor.memory   2g
spark.executor.memoryOverhead   1g

此外,我检查了问题来自 UI 的更多详细信息,并且能够发现失败的 Stage 来自我的代码的第 126 行,即以下连接:

tfidf = tf.join(idf)

并且两个rdds tf和idf计算为

tf = step1.map(lambda x: (x[0][0], (x[0][1], x[0][2], x[0][3], x[1]/x[0][3])))
idf = step1.map(lambda x: (x[0][0], (x[0][2], x[1], 1))). \
    map(lambda x: (x[0], x[1][2])). \
    reduceByKey(lambda x, y: x +y ). \
    map(lambda x: (x[0], (x[1], math.log10(number_of_docs/x[1]))))

rdds tf 和 idf 有不同的.count(),因为tf 是每个文档和单词,而idf 是每个单词,因此我加入了它们。这会是一个问题吗,所以我应该在使用分区命令加入之前检查它们是否大小相等,尽管它们很昂贵?如果这不是问题,那么如上所述处理这种大小数据的理想集群属性是什么?

【问题讨论】:

    标签: apache-spark pyspark mapreduce rdd spark-submit


    【解决方案1】:

    我从我的虚拟机提供给我的可用 4G 内存中又给了执行程序内存 1g,所以最终在以下设置中

    spark.executor.memory   3g
    spark.executor.memoryOverhead   1g
    

    异常消失了。 我仍然不确定这是否是最好的解决方案,或者我的代码是否需要修复以克服连接两个不同长度的 rdds 的问题,因此可能导致问题的分区。任何解释都将不胜感激,因为这是我第一次尝试使用 Apache Spark 应用程序。

    【讨论】:

      猜你喜欢
      • 2012-04-23
      • 1970-01-01
      • 1970-01-01
      • 2020-02-14
      • 2017-11-14
      • 2015-04-17
      • 2014-04-21
      • 2018-04-27
      • 1970-01-01
      相关资源
      最近更新 更多