【发布时间】:2021-11-24 14:21:26
【问题描述】:
我有一些方法与 DataFrame 交互并返回我需要的数字。
private def getExpectedPartitionBytes(df: DataFrame, partitionNames: Seq[String] = Seq())
(implicit spark: SparkSession): Long = {
val partitionsCount = df.select(partitionNames.map(c => col(c)): _*).dropDuplicates.count
val expectedTotalBytes = df.queryExecution.optimizedPlan.stats(spark.sessionState.conf)
.sizeInBytes.toLong
val expectedPartitionBytes = expectedTotalBytes / partitionsCount
// dirty estimation from dataframe dimension
// real size of types divided on two
val maxExpectedPartitionBytes = df.dtypes.filter(t => !partitionNames.contains(t._1)).map(_._2).map {
case "StringType" => 10
case "ByteType" => 1
case "ShortType" => 2
case "IntegerType" => 4
case "LongType" => 8
case "FloatType" => 4
case "DoubleType" => 8
case "TimestampType" => 6
case _ => 2
}.sum * df.count / partitionsCount
if (expectedTotalBytes > 0 && expectedPartitionBytes <= maxExpectedPartitionBytes) {
// if plan estimation is exists and real
expectedPartitionBytes
} else {
maxExpectedPartitionBytes
}
}
我对这种方法的性能有疑问。 我们都知道 DataFrame 是懒惰的。 首先,Spark 注册需要在 DataFrame 上执行的计算。然后,当我们请求计算结果时,Spark 开始工作。 在这方面,我想问你。请解释在我的示例中,DataFrame 计算是从什么时候开始的。
在我们为变量赋值的那一刻?
val partitionsCount = df.select(partitionNames.map(c => col(c)): _*).dropDuplicates.count
或者在方法试图向我们返回一个数字的那一刻?
if (expectedTotalBytes > 0 && expectedPartitionBytes <= maxExpectedPartitionBytes) {
// if plan estimation is exists and real
expectedPartitionBytes
} else {
maxExpectedPartitionBytes
}
或者也许除法时会开始计数?
}.sum * df.count / partitionsCount
我也想知道优化我的方法的可能性。我可以以某种方式合理地使用checkpoint() 或cache() 作为我的DataFrame 来减少不必要的计算吗?
我该如何做呢?
【问题讨论】:
标签: scala dataframe apache-spark pyspark apache-spark-sql