【问题标题】:How to find the size of the dataframe in spark streaming jobs如何在火花流作业中查找数据帧的大小
【发布时间】:2019-01-17 17:13:26
【问题描述】:

我正在尝试在每批中查找火花流作业中数据帧的大小。我能够成功地找到批处理作业中的大小,但是在流式传输方面我无法做到这一点。

我一直在 databricks 上开发 spark 应用程序,并在流式作业中尝试了“df.queryExecution.optimizedPlan.stats.sizeInBytes”。 但我得到以下异常: 带有流源的查询必须使用 writeStream.start();;;

我尝试将 'df.queryExecution.optimizedPlan.stats.sizeInBytes' 放入 forEachBatch() 函数中:

data.writeStream.foreachBatch { (df: DataFrame, batchId: Long) =>
df.persist() 
println("The size of the read is : " + df.queryExecution.optimizedPlan.stats.sizeInBytes)                              
}.start.option("checkpointLocation", outpath + "/_checkpoint") 

但这会创建一个新的流,由于某些限制,我们需要避免这种情况。

val data = spark.readStream
                .format("kafka") 
                .option(....)
                .load()

println("The size of the read is : " + data.queryExecution.optimizedPlan.stats.sizeInBytes) 

是否有任何 hack 或任何 api 调用在不使用“forEachBatch()”或不创建新流的情况下返回流中数据帧的大小?

【问题讨论】:

  • 注意详细说明“但这会创建一个新的流,由于某些限制我们需要避免。”?

标签: python scala apache-spark spark-structured-streaming


【解决方案1】:

您可以尝试关注

很少进口

import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd
import org.apache.spark.util.SizeEstimator

计算RDD的大小

def calcRDDSize(rdd: RDD[String]): Long = {
    rdd.map(_.getBytes("UTF-8").length.toLong)
    .reduce(_+_) //add the sizes together
}

大小计算为

val rdd1 = df.rdd.map(_.toString())
calcRDDSize(rdd1)

其中 df 是您的数据框。它将以字节为单位估计大小。

希望这会有所帮助:)

【讨论】:

  • 如果我们将数据帧转换为rdd,我们将无法获得数据帧的确切大小。此外,如果将每一行转换为字符串,它会改变整个画面。
猜你喜欢
  • 2018-08-12
  • 2015-12-11
  • 2016-03-23
  • 2019-07-07
  • 2020-03-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-01-23
相关资源
最近更新 更多