【问题标题】:Evaluation of the method working with the DataFrame评估使用 DataFrame 的方法
【发布时间】:2021-11-24 14:21:26
【问题描述】:

我有一些方法与 DataFrame 交互并返回我需要的数字。

     private def getExpectedPartitionBytes(df: DataFrame, partitionNames: Seq[String] = Seq())
                                          (implicit spark: SparkSession): Long = {
    
        val partitionsCount = df.select(partitionNames.map(c => col(c)): _*).dropDuplicates.count
           
        val expectedTotalBytes = df.queryExecution.optimizedPlan.stats(spark.sessionState.conf)
                                                                .sizeInBytes.toLong
    
        val expectedPartitionBytes = expectedTotalBytes / partitionsCount
        // dirty estimation from dataframe dimension
        // real size of types divided on two
        val maxExpectedPartitionBytes = df.dtypes.filter(t => !partitionNames.contains(t._1)).map(_._2).map {
          case "StringType" => 10
          case "ByteType" => 1
          case "ShortType" => 2
          case "IntegerType" => 4
          case "LongType" => 8
          case "FloatType" => 4
          case "DoubleType" => 8
          case "TimestampType" => 6
          case _ => 2
        }.sum * df.count / partitionsCount
    
    
        if (expectedTotalBytes > 0 && expectedPartitionBytes <= maxExpectedPartitionBytes) {
          // if plan estimation is exists and real
          expectedPartitionBytes
        } else {
          maxExpectedPartitionBytes
        }
      }

我对这种方法的性能有疑问。 我们都知道 DataFrame 是懒惰的。 首先,Spark 注册需要在 DataFrame 上执行的计算。然后,当我们请求计算结果时,Spark 开始工作。 在这方面,我想问你。请解释在我的示例中,DataFrame 计算是从什么时候开始的。

在我们为变量赋值的那一刻?

val partitionsCount = df.select(partitionNames.map(c => col(c)): _*).dropDuplicates.count

或者在方法试图向我们返回一个数字的那一刻?

if (expectedTotalBytes > 0 && expectedPartitionBytes <= maxExpectedPartitionBytes) {
          // if plan estimation is exists and real
          expectedPartitionBytes
        } else {
          maxExpectedPartitionBytes
        }

或者也许除法时会开始计数?

}.sum * df.count / partitionsCount

我也想知道优化我的方法的可能性。我可以以某种方式合理地使用checkpoint()cache() 作为我的DataFrame 来减少不必要的计算吗? 我该如何做呢?

【问题讨论】:

    标签: scala dataframe apache-spark pyspark apache-spark-sql


    【解决方案1】:

    请说明在我的示例中,DataFrame 计算是从什么时候开始的。

    partitionCount 的值被立即评估,因为count 调用是一个“动作”。如果你想推迟评估,你可以在 Scala 中声明一个惰性变量:

    lazy val partitionCount = df.count 
    partitionCount: Long = <lazy>
    

    您会看到该类型如预期的那样是Long,但没有关联的值。当您第一次访问 val 时,表达式将被计算。

    我能否以某种方式合理地为我的 DataFrame 使用 checkpoint() 或 cache() 来减少不必要的计算?

    查看您提供的代码,您没有进行任何重复的工作。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-07-20
      相关资源
      最近更新 更多