【发布时间】:2015-06-30 01:01:29
【问题描述】:
我是 spark 的新手,我想了解一些关于它在幕后工作的基本机制。 我附上了我的 RDD 的血统,我有以下问题:
- 为什么我有 8 个阶段而不是 5 个阶段?从《Learning from Spark》这本书(第 8 章 http://bit.ly/1E0Hah7)中,我可以理解“与它们的缩进级别相同的 RDD 在物理执行期间,父母将被流水线化[进入同一物理阶段]。由于我有 5 个父母,我预计会有 5 个阶段。仍然是 Spark UI 阶段视图,显示 8 个阶段。 还有什么代表调试字符串中表示的(8)?这个函数有bug吗?
- 在阶段级别,任务之间的执行顺序是什么?
它们可以并行执行
HadoopRDD[0] || MappedRDD[1] || MapPartitionsRDD[4] || ZippedWithIndexRDD[6]
或者他们正在等待每个任务完成
HadoopRDD[0]=>completed=>MappedRDD[1]=>completed=>etc ?
- 阶段之间,顺序由执行计划给定,所以每个阶段都在等待之前的阶段完成。这是一个正确的假设吗?
期待您的回答。
问候, 弗洛林
(8) MappedRDD[21] at map at WAChunkSepvgFilterNewModel.scala:298 []
| MappedRDD[20] at map at WAChunkSepvgFilterNewModel.scala:182 []
| ShuffledRDD[19] at sortByKey at WAChunkSepvgFilterNewModel.scala:182 []
+-(8) ShuffledRDD[16] at aggregateByKey at WAChunkSepvgFilterNewModel.scala:182 []
+-(8) FlatMappedRDD[15] at flatMap at WAChunkSepvgFilterNewModel.scala:174 []
| ZippedWithIndexRDD[14] at zipWithIndex at WAChunkSepvgFilterNewModel.scala:174 []
| MappedRDD[13] at map at WAChunkSepvgFilterNewModel.scala:272 []
| MappedRDD[12] at map at WAChunkSepvgFilterNewModel.scala:161 []
| ShuffledRDD[11] at sortByKey at WAChunkSepvgFilterNewModel.scala:161 []
+-(8) ShuffledRDD[8] at aggregateByKey at WAChunkSepvgFilterNewModel.scala:161 []
+-(8) FlatMappedRDD[7] at flatMap at WAChunkSepvgFilterNewModel.scala:153 []
| ZippedWithIndexRDD[6] at zipWithIndex at WAChunkSepvgFilterNewModel.scala:153 []
| MappedRDD[5] at map at WAChunkSepvgFilterNewModel.scala:248 []
| MapPartitionsRDD[4] at mapPartitionsWithIndex at WAChunkSepvgFilterNewModel.scala:114 []
| test4spark.csv MappedRDD[1] at textFile at WAChunkSepvgFilterNewModel.scala:215 []
| test4spark.csv HadoopRDD[0] at textFile at WAChunkSepvgFilterNewModel.scala:215 []
【问题讨论】:
-
如果包含实际代码会更容易解释。
-
ZipWithIndex 方法开始一个新的工作和一个新的阶段。由于我有 2 个 zipWithIndex 方法调用,我将获得 2 个额外阶段。此外, sortByKey 方法开始一个新的工作阶段,所以我得到了另一个额外的阶段。因此我有8个阶段。问题 2 和 3 的答案不取决于我的代码。它们应该由 Spark 执行框架的核心覆盖。但我找不到关于这些主题的任何文档。
标签: apache-spark