【问题标题】:Profiling the Spark Analyzer: how to access the QueryPlanningTracker for a pyspark query?分析 Spark 分析器:如何访问 QueryPlanningTracker 以进行 pyspark 查询?
【发布时间】:2022-02-16 08:37:14
【问题描述】:

有任何 Spark 和 Py4J 专家可以解释如何从 pyspark 的 Python 端可靠地访问 Spark 的 java 对象和变量吗?具体来说,如何通过python访问Spark的QueryPlanningTracker中的数据?

详情

我正在尝试分析 创建 pyspark 数据框 (df = spark_session.sql(thousand_line_query))。不运行查询。只需创建数据框,以便我可以检查其架构。仅仅等待 .sql() 调用的返回,该调用初始化没有数据的数据帧需要很长时间(10-30 秒)。我已经跟踪到Spark's Analyzer 阶段的缓慢步骤。日志记录(如下)表明 Spark 多次重新计算相同的子查询,所以我试图通过分析 Spark 在我的查询上的工作来深入了解发生了什么。我尝试了一些文章中的方法来分析 Spark Optimizer 阶段以执行查询(例如 Luca Canali 的 sparkMeasure,Rose Toomey 的 Care and Feeding of Catalyst Optimizer)。但是我没有找到专注于分析在优化器阶段之前运行的 Spark Analyzer 阶段的指南。 (因此,我还在下面提供了额外的详细信息,说明我发现其他人可能会觉得有帮助。)

阅读 Spark 的 Scala 源代码,我看到了 the Analyzer is a RuleExecutorRuleExecutors have a QueryPlanningTracker,它们似乎记录了 Spark 运行的每个分析器规则的每次调用的详细信息,特别是允许人们重建时间线分析器正在执行单个查询。

但是,我似乎无法从 python 访问分析器的 QueryPlanningTracker 中的数据。我希望能够检索一个包含运行一个查询的完整详细信息的QueryPlanningTracker java 对象,并检查 Python 代码中可用的字段和方法。有什么建议吗?

示例

在python中使用pyspark,为我的1000行查询请求一个数据框,发现它很慢:

query_sql = 'SELECT ... <long query here>
spark_df = spark_session.sql(query_sql) # takes 10-30 seconds

打开大量日志记录,重新运行上面的查询,查看输出并看到缓慢的步骤都提到了 Spark Analyzer 中的 PlanCheckLogger。还可以访问 Spark 的 RuleExecutor 以查看每个规则使用了多少时间以及哪些规则无效:

spark_session.sparkContext.setLogLevel('ALL')
rule_executor = spark_session._jvm.org.apache.spark.sql.catalyst.rules.RuleExecutor
rule_executor.resetMetrics()
spark_df = spark_session.sql(query_sql)  # logs 10,000+ lines of output, lines with keyword `PlanChangeLogger` give timestamps showing the slow steps are in the Analyzer, but not the order of steps that occur
print(rule_executor.dumpTimeSpent())  # prints Analyzer rules that ran, how much time was 'effective' for each rule, but no specifics on order of rules run, no details on why rules took up a lot of time but were not effective.

下一步:尝试(未成功)访问 Spark 的 QueryPlanningTracker 数据以深入了解规则运行的时间线、每次调用每个规则所用的时间以及我可以获得的任何其他细节:

tracker = spark_session._jvm.org.apache.spark.sql.catalyst.QueryPlanningTracker
## Use some call here to show data contents of the tracker; which currently gives E.g. intitial exploration: 
tracker.measurePhase.topRulesByTime(10)
*** TypeError: 'JavaPackage' object is not callable ....

以上是一个例子;跟踪器代码表明它具有我可以使用的其他方法和字段,但是我看不到如何访问这些方法和字段,也看不到如何从 Python 中检查哪些方法和字段可用,因此读取 Spark 的 github 存储库只是试错...

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql profiling


    【解决方案1】:

    你可以试试这个:

    >>> df = spark.range(1000).selectExpr("count(*)")
    
    >>> tracker = df._jdf.queryExecution().tracker()
    >>> print(tracker)
    org.apache.spark.sql.catalyst.QueryPlanningTracker@5702d8be
    
    >>> print(tracker.topRulesByTime(10))
    Stream((org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions,RuleSummary(27004600, 2, 1)), ?)
    

    我不确定您需要什么样的信息。但是如果你想知道生成的查询计划。你可以使用df.explain()

    【讨论】:

    • 谢谢;这有帮助,但我只是一个规则的摘要,而不是每个规则调用的详细信息:` df = spark_session.sql(sql_clause) tracker = df._jdf.queryExecution().tracker() breakpoint() (Pdb) print(tracker ) org.apache.spark.sql.catalyst.QueryPlanningTracker@23d90c60 (Pdb) print(tracker.topRulesByTime(10))Stream((org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences,RuleSummary(3514149520, 23 , 18)), ?) (Pdb) ` 我试图显示每个规则调用、规则运行时间和规则参数的时间线。
    猜你喜欢
    • 1970-01-01
    • 2015-09-28
    • 2014-10-07
    • 2015-05-04
    • 2020-12-03
    • 1970-01-01
    • 2010-11-30
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多