在做了一些挖掘之后,我找到了一种方法:
- 您可以通过py4j's callbacks 注册一个QueryExecutionListener(注意,这是source 中的
@DeveloperApi 注释)
- 但您需要在应用程序运行结束时手动启动回调服务器并停止网关。
这是受 cloudera 社区中 post 的启发,我不得不将它移植到更新的 spark 版本(这使用 spark 3.0.1,那里建议的答案使用已弃用的 SQLContext)和 pyspark(使用 py4j 回调)。
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession, DataFrame
class Listener:
def onSuccess(self, funcName, qe, durationNs):
print("success", funcName, durationNs, qe.executedPlan().metrics())
print("rows", qe.executedPlan().metrics().get("numOutputRows").value())
print("files", qe.executedPlan().metrics().get("numFiles").value())
print("bytes", qe.executedPlan().metrics().get("numOutputBytes").value())
def onFailure(self, funcName, qe, exception):
print("failure", funcName, exception, qe.executedPlan().metrics())
class Java:
implements = ["org.apache.spark.sql.util.QueryExecutionListener"]
def run():
spark: SparkSession = SparkSession.builder.getOrCreate()
df: DataFrame = spark.createDataFrame(pd.DataFrame(np.random.randn(20, 3), columns=["foo", "bar", "qux"]))
gateway = spark.sparkContext._gateway
gateway.start_callback_server()
listener = Listener()
spark._jsparkSession.listenerManager().register(listener)
df.write.parquet("/tmp/file.parquet", mode='overwrite')
spark._jsparkSession.listenerManager().unregister(listener)
spark.stop()
spark.sparkContext.stop()
gateway.shutdown()
if __name__ == '__main__':
run()