【发布时间】:2020-09-09 23:48:23
【问题描述】:
刚接触火花。我创建了一个 spark 作业,它为每个用户进行一些数据处理。我想要做的是获取目录中的所有文件并处理这些文件。有多个目录,并且有多个用户。
读取一个用户目录中的文件后,我做了一些转换帖子,我需要对它们进行集体处理(比如根据数据删除一些重复项)。为此,我通过 RDD 调用 collect()。
当运行 10 个目录时,它运行良好,但是当运行 1000 个目录时,它会卡在collect() 调用。
我只做了本地测试。
点燃火花:
private lazy val sparkSession = SparkSession
.builder()
.appName("Custom Job")
.master("local[*]")
.getOrCreate()
读取目录和并行化:
val allDirs: Seq[String] = fs.getAllDirInPath(Configuration.inputDir)
val paths: RDD[String] = SessionWrapper.getSparkContext.parallelize(allDirs)
转换和collect 呼叫:
paths.foreachPartition { partition =>
partition.foreach { dir =>
val dirData = readDataByDir(dir) // RDD[String]
val transformed = doTranform(dirData) // RDD[CustomObject]
val collectedData = tranformed.collect()
// Do something on collected data
writeToFile(collectedData)
}
}
来自卡住控制台的一些日志:
20/09/09 19:24:40 INFO SparkContext: Starting job: collect at MyCustomHelperWithCollectCall.scala:18
20/09/09 19:24:40 INFO FileInputFormat: Total input paths to process : 3
20/09/09 19:24:40 INFO CombineFileInputFormat: DEBUG: Terminated node allocation with : CompletedNodes: 1, size left: 65935
20/09/09 19:24:40 INFO DAGScheduler: Got job 2 (collect at MyCustomHelperWithCollectCall.scala:18) with 2 output partitions
20/09/09 19:24:40 INFO DAGScheduler: Final stage: ResultStage 2 (collect at MyCustomHelperWithCollectCall.scala:18)
20/09/09 19:24:40 INFO DAGScheduler: Parents of final stage: List()
20/09/09 19:24:40 INFO SparkContext: Starting job: collect at MyCustomHelperWithCollectCall.scala:18
20/09/09 19:24:40 INFO SparkContext: Starting job: collect at MyCustomHelperWithCollectCall.scala:18
20/09/09 19:24:40 INFO DAGScheduler: Missing parents: List()
20/09/09 19:24:40 INFO SparkContext: Starting job: collect at MyCustomHelperWithCollectCall.scala:18
20/09/09 19:24:40 INFO SparkContext: Starting job: collect at MyCustomHelperWithCollectCall.scala:18
20/09/09 19:24:40 INFO SparkContext: Starting job: collect at MyCustomHelperWithCollectCall.scala:18
20/09/09 19:24:40 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[102] at map at MyCustomHelperWithCollectCall.scala:18), which has no missing parents
20/09/09 19:24:40 INFO SparkContext: Starting job: collect at MyCustomHelperWithCollectCall.scala:18
20/09/09 19:24:40 INFO SparkContext: Starting job: collect at MyCustomHelperWithCollectCall.scala:18
20/09/09 19:24:40 INFO SparkContext: Starting job: collect at MyCustomHelperWithCollectCall.scala:18
20/09/09 19:24:40 INFO SparkContext: Starting job: collect at MyCustomHelperWithCollectCall.scala:18
20/09/09 19:24:40 INFO MemoryStore: Block broadcast_14 stored as values in memory (estimated size 7.2 KiB, free 2001.0 MiB)
20/09/09 19:24:40 INFO SparkContext: Starting job: collect at MyCustomHelperWithCollectCall.scala:18
20/09/09 19:24:40 INFO MemoryStore: Block broadcast_14_piece0 stored as bytes in memory (estimated size 3.5 KiB, free 2001.0 MiB)
20/09/09 19:24:40 INFO BlockManagerInfo: Added broadcast_14_piece0 in memory on 192.168.31.222:55666 (size: 3.5 KiB, free: 2004.3 MiB)
20/09/09 19:24:40 INFO SparkContext: Created broadcast 14 from broadcast at DAGScheduler.scala:1200
20/09/09 19:24:40 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 2 (MapPartitionsRDD[102] at map at MyCustomHelperWithCollectCall.scala:18) (first 15 tasks are for partitions Vector(0, 1))
20/09/09 19:24:40 INFO TaskSchedulerImpl: Adding task set 2.0 with 2 tasks
20/09/09 19:24:40 INFO DAGScheduler: Got job 3 (collect at MyCustomHelperWithCollectCall.scala:18) with 1 output partitions
20/09/09 19:24:40 INFO DAGScheduler: Final stage: ResultStage 3 (collect at MyCustomHelperWithCollectCall.scala:18)
20/09/09 19:24:40 INFO DAGScheduler: Parents of final stage: List()
20/09/09 19:24:40 INFO DAGScheduler: Missing parents: List()
20/09/09 19:24:40 INFO DAGScheduler: Submitting ResultStage 3 (MapPartitionsRDD[96] at map at MyCustomHelperWithCollectCall.scala:18), which has no missing parents
20/09/09 19:24:40 INFO MemoryStore: Block broadcast_15 stored as values in memory (estimated size 7.2 KiB, free 2001.0 MiB)
20/09/09 19:24:40 INFO MemoryStore: Block broadcast_15_piece0 stored as bytes in memory (estimated size 3.5 KiB, free 2001.0 MiB)
20/09/09 19:24:40 INFO BlockManagerInfo: Added broadcast_15_piece0 in memory on 192.168.31.222:55666 (size: 3.5 KiB, free: 2004.3 MiB)
20/09/09 19:24:40 INFO SparkContext: Created broadcast 15 from broadcast at DAGScheduler.scala:1200
20/09/09 19:24:40 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 3 (MapPartitionsRDD[96] at map at MyCustomHelperWithCollectCall.scala:18) (first 15 tasks are for partitions Vector(0))
20/09/09 19:24:40 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
20/09/09 19:24:40 INFO DAGScheduler: Got job 4 (collect at MyCustomHelperWithCollectCall.scala:18) with 1 output partitions
20/09/09 19:24:40 INFO DAGScheduler: Final stage: ResultStage 4 (collect at MyCustomHelperWithCollectCall.scala:18)
20/09/09 19:24:40 INFO DAGScheduler: Parents of final stage: List()
20/09/09 19:24:40 INFO DAGScheduler: Missing parents: List()
20/09/09 19:24:40 INFO DAGScheduler: Submitting ResultStage 4 (MapPartitionsRDD[101] at map at MyCustomHelperWithCollectCall.scala:18), which has no missing parents
20/09/09 19:24:40 INFO FileInputFormat: Total input paths to process : 5
请帮忙!
【问题讨论】:
标签: scala apache-spark