我以前没有尝试过,但是“要生成、缓存和重用查询计划”你应该简单地(重新)使用查询(它可能不一定是“形状”您通常会使用,但有一个可能适合您的情况)。
(大声思考)
每个结构化查询(无论是 Dataset、DataFrame 还是 SQL)都会经历多个阶段,即解析、分析、逻辑优化、规划和物理优化。
结构化查询由其计划描述,优化的物理查询计划是您可以使用Dataset.explain看到的计划:
explain(): Unit 将物理计划打印到控制台以进行调试。
scala> spark.version
res0: String = 2.3.1-SNAPSHOT
scala> :type q
org.apache.spark.sql.DataFrame
scala> q.explain
== Physical Plan ==
*(1) Project [id#0L, (id#0L * 2) AS x2#2L]
+- *(1) Range (0, 4, step=1, splits=8)
您不直接使用计划,但关键是您可以。另一个重要的一点是计划通常对他们优化的数据集一无所知(我说通常因为Spark SQL有一个基于成本的优化器数据以提供最优化的查询计划)。
每当您执行操作时,查询都会通过所谓的结构化查询执行管道。每次执行一个动作时它都会进行“预处理”(即使那是相同的动作)。这就是您可以缓存结果的原因,但这会使查询与数据永远绑定在一起(您希望避免这种情况)。
话虽如此,我认为您可以在调用操作之前进行优化(并通过查询的“管道”抽取数据)。只需使用您可以使用QueryExecution.rdd 生成的优化物理查询计划,该计划将为您提供代表结构化查询的 RDD。使用该 RDD,您可以在每个批处理间隔中简单地 RDD.[theAction],这将避免结构化查询成为 RDD 所经历的所有阶段。
scala> q.rdd
res2: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[4] at rdd at <console>:26
您甚至可以改用QueryExecution.toRdd 来“优化”RDD。
scala> q.queryExecution.toRdd
res4: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = MapPartitionsRDD[7] at toRdd at <console>:26
但是(再次大声思考)所有这些重用都是自动发生的,因为阶段是lazy vals 所以只是......不,它不起作用......忽略最后一个“但是”并坚持重用的想法底层 RDD :) 它应该可以工作。
顺便说一句,这几乎就是 Spark Structured Streaming 过去使用微批处理执行每个批处理(间隔)的方式。不过,在 2.3 中,情况发生了变化。