【问题标题】:pyspark: count number of rows writtenpyspark:计算写入的行数
【发布时间】:2021-06-25 04:48:28
【问题描述】:

当我这样做时

df: DataFrame = ...
df.write.parquet('some://location/')

我可以跟踪和报告(用于监控)刚刚写入some://location 的行数吗?

df.write.parquet('some://location/')
# I imagine something like:
spark_session.someWeirdApi().mostRecentOperation().number_of_rows_written

【问题讨论】:

    标签: python apache-spark pyspark monitoring metrics


    【解决方案1】:

    在做了一些挖掘之后,我找到了一种方法:

    • 您可以通过py4j's callbacks 注册一个QueryExecutionListener(注意,这是source 中的@DeveloperApi 注释)
    • 但您需要在应用程序运行结束时手动启动回调服务器并停止网关。

    这是受 cloudera 社区中 post 的启发,我不得不将它移植到更新的 spark 版本(这使用 spark 3.0.1,那里建议的答案使用已弃用的 SQLContext)和 pyspark(使用 py4j 回调)。

    import numpy as np
    import pandas as pd
    from pyspark.sql import SparkSession, DataFrame
    
    
    class Listener:
        def onSuccess(self, funcName, qe, durationNs):
            print("success", funcName, durationNs, qe.executedPlan().metrics())
            print("rows", qe.executedPlan().metrics().get("numOutputRows").value())
            print("files", qe.executedPlan().metrics().get("numFiles").value())
            print("bytes", qe.executedPlan().metrics().get("numOutputBytes").value())
    
        def onFailure(self, funcName, qe, exception):
            print("failure", funcName, exception, qe.executedPlan().metrics())
    
        class Java:
            implements = ["org.apache.spark.sql.util.QueryExecutionListener"]
    
    
    def run():
        spark: SparkSession = SparkSession.builder.getOrCreate()
    
        df: DataFrame = spark.createDataFrame(pd.DataFrame(np.random.randn(20, 3), columns=["foo", "bar", "qux"]))
    
        gateway = spark.sparkContext._gateway
        gateway.start_callback_server()
    
        listener = Listener()
        spark._jsparkSession.listenerManager().register(listener)
    
        df.write.parquet("/tmp/file.parquet", mode='overwrite')
    
        spark._jsparkSession.listenerManager().unregister(listener)
    
        spark.stop()
        spark.sparkContext.stop()
        gateway.shutdown()
    
    
    if __name__ == '__main__':
        run()
    

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-06-21
    • 2018-08-07
    • 2019-07-08
    • 2020-07-31
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多