这里有两个不同的概念来解释智能查询执行系统是如何工作的,无论是 Spark 还是 RDBMS。
1.证明查询/执行的准确结果
一个查询/执行被解析成一个 DAG,代表不同的执行步骤和它们之间的依赖关系。这些步骤可以表示为Map 或Reduce 类型的步骤。每个独立的步骤都是一个“阶段”,两个阶段由一个随机边界分隔。
可以免费打破阶段之间的这些依赖关系,它们将连续运行(在给定的执行程序中)。
在我的这篇文章中,我解释了 spark 如何按照提供的顺序执行操作以提供正确的结果 - Spark withColumn and where execution order
2.快速提供结果
在一个阶段内,根据 DAG 的定义方式,某些步骤可以并行化。这就是你看到 Spark 会使用许多机制优化执行计划的地方,比如懒惰、先一步运行、催化剂、编码、全阶段代码生成、使用统计信息、谓词下推、列访问、缓存等。新技术是随着事情的发展而添加。这就是 Spark 击败 Hadoop 的地方。在 Hadoop 中,您需要自己编写所有优化,但 Spark 会在后台处理它。同样的 RDBM 也可以工作。如果需要,我可以解释每种技术。
要处理的数据分散在许多执行器中,这些执行器在不同的执行器上运行相同的“阶段”。这称为可扩展性。随着集群大小的增加(对于大型数据集),作业会运行得更快。此行为与 Hadoop 相同。开发人员在某种程度上仍然负责以某种方式编码,以确保实现最大的并行性。
让我们看看你的例子
如果orderBy 没有第一次发生,limit 无法提供准确的结果。所以它将按照orderBy 然后limit 的顺序执行。它永远不会重新安排这个执行顺序。
val df = spark.createDataset(List(("a","b","c"),("a1","b1","c1"),......).toDF("guitarid","make","model")
df.cache()//without this I was not getting the full plan.
val df1 = df.orderBy("make").limit(1)
df1.show(false)
df1.explain(true)
计划如下。逻辑计划建议执行顺序。物理计划使用特殊阶段“TakeOrderedAndProject”优化了执行。
== Analyzed Logical Plan ==
guitarid: string, make: string, model: string
GlobalLimit 1
+- LocalLimit 1
+- Sort [make#8 ASC NULLS FIRST], true
+- Project [_1#3 AS guitarid#7, _2#4 AS make#8, _3#5 AS model#9]
+- LocalRelation [_1#3, _2#4, _3#5]
== Optimized Logical Plan ==
GlobalLimit 1
+- LocalLimit 1
+- Sort [make#8 ASC NULLS FIRST], true
+- InMemoryRelation [guitarid#7, make#8, model#9], StorageLevel(disk, memory, deserialized, 1 replicas)
+- LocalTableScan [guitarid#7, make#8, model#9]
== Physical Plan ==
TakeOrderedAndProject(limit=1, orderBy=[make#8 ASC NULLS FIRST], output=[guitarid#7,make#8,model#9])
+- InMemoryTableScan [guitarid#7, make#8, model#9]
+- InMemoryRelation [guitarid#7, make#8, model#9], StorageLevel(disk, memory, deserialized, 1 replicas)
+- LocalTableScan [guitarid#7, make#8, model#9]
如果我们在orderBy 之前调用limit,那么它会保持相同的顺序 - 限制第一个然后排序以确保结果符合您的预期。它不会给出错误的性能结果
val df1 = df.limit(1).orderBy("make")
df1.show(false)
df1.explain(true)
== Analyzed Logical Plan ==
guitarid: string, make: string, model: string
Sort [make#8 ASC NULLS FIRST], true
+- GlobalLimit 1
+- LocalLimit 1
+- Project [_1#3 AS guitarid#7, _2#4 AS make#8, _3#5 AS model#9]
+- LocalRelation [_1#3, _2#4, _3#5]
== Optimized Logical Plan ==
Sort [make#8 ASC NULLS FIRST], true
+- GlobalLimit 1
+- LocalLimit 1
+- InMemoryRelation [guitarid#7, make#8, model#9], StorageLevel(disk, memory, deserialized, 1 replicas)
+- LocalTableScan [guitarid#7, make#8, model#9]
== Physical Plan ==
*(2) Sort [make#8 ASC NULLS FIRST], true, 0
+- *(2) GlobalLimit 1
+- Exchange SinglePartition
+- *(1) LocalLimit 1
+- InMemoryTableScan [guitarid#7, make#8, model#9]
+- InMemoryRelation [guitarid#7, make#8, model#9], StorageLevel(disk, memory, deserialized, 1 replicas)
+- LocalTableScan [guitarid#7, make#8, model#9]
另一个例子 - 当你想要加入 2 个数据帧时,Spark 可能会选择 Hashjoin 与 broadcasthashjoin 以获得性能,但最终结果将是相同的。
另一方面,如果我们有如下代码。由于这两个操作依赖于单独的列,它们可以按任何顺序执行。
df.withColumn("column10", expression on colum1)
.withColumn("column11", expression on colum2)
结论
我相信 Spark 的执行引擎会以高效的方式提供准确的结果。性能会随着执行引擎的升级而自动提升,所以只要坚持使用 Spark 的最新语法即可。