【发布时间】:2020-09-06 21:44:39
【问题描述】:
我正在尝试使用以下方法获取数据帧的分区数:
df.rdd.getNumPartitions.toString
但是,当我监控 spark 日志时,我发现它旋转了很多阶段,而且操作成本很高。
据我了解,dataframe 通过元数据向 rdd 添加了结构层。那么,为什么在转换为 rdd 时剥离它需要这么长时间?
【问题讨论】:
标签: apache-spark apache-spark-sql rdd
我正在尝试使用以下方法获取数据帧的分区数:
df.rdd.getNumPartitions.toString
但是,当我监控 spark 日志时,我发现它旋转了很多阶段,而且操作成本很高。
据我了解,dataframe 通过元数据向 rdd 添加了结构层。那么,为什么在转换为 rdd 时剥离它需要这么长时间?
【问题讨论】:
标签: apache-spark apache-spark-sql rdd
DataFrame 是一个优化的分布式表格集合。由于它保持表格格式(类似于 SQL 表),它可以保留元数据以允许 Spark 在后台执行一些优化。
此优化由诸如Catalyst 和Tungsten 等辅助项目执行
RDD 不维护任何模式,如果需要,您需要提供一个。所以RDD没有Dataframe那么优化,(完全不涉及Catalyst)
将 DataFrame 转换为 RDD 会强制 Spark 循环遍历所有元素,将它们从高度优化的 Catalyst 空间转换为 scala 空间。
检查来自.rdd的代码
lazy val rdd: RDD[T] = {
val objectType = exprEnc.deserializer.dataType
rddQueryExecution.toRdd.mapPartitions { rows =>
rows.map(_.get(0, objectType).asInstanceOf[T])
}
}
@transient private lazy val rddQueryExecution: QueryExecution = {
val deserialized = CatalystSerde.deserialize[T](logicalPlan)
sparkSession.sessionState.executePlan(deserialized)
}
首先,它正在执行计划并将输出检索为RDD[InternalRow],顾名思义,它仅供内部使用,需要转换为RDD[Row]
然后它遍历所有转换它们的行。如您所见,这不仅仅是删除架构
希望能回答您的问题。
【讨论】: