【发布时间】:2021-12-15 17:38:55
【问题描述】:
我正在处理最初 569 MB 的数据集,计算 TF-IDF 指标。虽然我最终得到了结果,但我不断收到以下错误:
WARN scheduler.TaskSetManager: Lost task 13.0 in stage 11.0 (TID 84, X.X.X.X, executor 0): FetchFailed(null, shuffleId=4, mapId=-1, reduceId=4, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 4
at org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:882)
at org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:878)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:878)
at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:691)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:103)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我已经阅读了相关帖子,并且已经更改了一些火花属性,如下所示
spark=SparkSession.builder.appName("part_2_task_2").config('spark.executor.memory','2g').config('spark.executor.memoryOverhead','1g').config('spark.shuffle.io.maxRetries',5).config('spark.shuffle.io.retryWait','30s').config('spark.network.timeout','200s').getOrCreate()
所以目前我有以下集群详细信息:
spark.executor.cores 2
spark.executor.instances 2
spark.executor.memory 2g
spark.executor.memoryOverhead 1g
此外,我检查了问题来自 UI 的更多详细信息,并且能够发现失败的 Stage 来自我的代码的第 126 行,即以下连接:
tfidf = tf.join(idf)
并且两个rdds tf和idf计算为
tf = step1.map(lambda x: (x[0][0], (x[0][1], x[0][2], x[0][3], x[1]/x[0][3])))
idf = step1.map(lambda x: (x[0][0], (x[0][2], x[1], 1))). \
map(lambda x: (x[0], x[1][2])). \
reduceByKey(lambda x, y: x +y ). \
map(lambda x: (x[0], (x[1], math.log10(number_of_docs/x[1]))))
rdds tf 和 idf 有不同的.count(),因为tf 是每个文档和单词,而idf 是每个单词,因此我加入了它们。这会是一个问题吗,所以我应该在使用分区命令加入之前检查它们是否大小相等,尽管它们很昂贵?如果这不是问题,那么如上所述处理这种大小数据的理想集群属性是什么?
【问题讨论】:
标签: apache-spark pyspark mapreduce rdd spark-submit