【发布时间】:2019-01-17 17:13:26
【问题描述】:
我正在尝试在每批中查找火花流作业中数据帧的大小。我能够成功地找到批处理作业中的大小,但是在流式传输方面我无法做到这一点。
我一直在 databricks 上开发 spark 应用程序,并在流式作业中尝试了“df.queryExecution.optimizedPlan.stats.sizeInBytes”。 但我得到以下异常: 带有流源的查询必须使用 writeStream.start();;;
我尝试将 'df.queryExecution.optimizedPlan.stats.sizeInBytes' 放入 forEachBatch() 函数中:
data.writeStream.foreachBatch { (df: DataFrame, batchId: Long) =>
df.persist()
println("The size of the read is : " + df.queryExecution.optimizedPlan.stats.sizeInBytes)
}.start.option("checkpointLocation", outpath + "/_checkpoint")
但这会创建一个新的流,由于某些限制,我们需要避免这种情况。
val data = spark.readStream
.format("kafka")
.option(....)
.load()
println("The size of the read is : " + data.queryExecution.optimizedPlan.stats.sizeInBytes)
是否有任何 hack 或任何 api 调用在不使用“forEachBatch()”或不创建新流的情况下返回流中数据帧的大小?
【问题讨论】:
-
注意详细说明“但这会创建一个新的流,由于某些限制我们需要避免。”?
标签: python scala apache-spark spark-structured-streaming