【发布时间】:2022-02-16 08:37:14
【问题描述】:
有任何 Spark 和 Py4J 专家可以解释如何从 pyspark 的 Python 端可靠地访问 Spark 的 java 对象和变量吗?具体来说,如何通过python访问Spark的QueryPlanningTracker中的数据?
详情
我正在尝试分析 创建 pyspark 数据框 (df = spark_session.sql(thousand_line_query))。不运行查询。只需创建数据框,以便我可以检查其架构。仅仅等待 .sql() 调用的返回,该调用初始化没有数据的数据帧需要很长时间(10-30 秒)。我已经跟踪到Spark's Analyzer 阶段的缓慢步骤。日志记录(如下)表明 Spark 多次重新计算相同的子查询,所以我试图通过分析 Spark 在我的查询上的工作来深入了解发生了什么。我尝试了一些文章中的方法来分析 Spark Optimizer 阶段以执行查询(例如 Luca Canali 的 sparkMeasure,Rose Toomey 的 Care and Feeding of Catalyst Optimizer)。但是我没有找到专注于分析在优化器阶段之前运行的 Spark Analyzer 阶段的指南。 (因此,我还在下面提供了额外的详细信息,说明我发现其他人可能会觉得有帮助。)
阅读 Spark 的 Scala 源代码,我看到了 the Analyzer is a RuleExecutor 和 RuleExecutors have a QueryPlanningTracker,它们似乎记录了 Spark 运行的每个分析器规则的每次调用的详细信息,特别是允许人们重建时间线分析器正在执行单个查询。
但是,我似乎无法从 python 访问分析器的 QueryPlanningTracker 中的数据。我希望能够检索一个包含运行一个查询的完整详细信息的QueryPlanningTracker java 对象,并检查 Python 代码中可用的字段和方法。有什么建议吗?
示例
在python中使用pyspark,为我的1000行查询请求一个数据框,发现它很慢:
query_sql = 'SELECT ... <long query here>
spark_df = spark_session.sql(query_sql) # takes 10-30 seconds
打开大量日志记录,重新运行上面的查询,查看输出并看到缓慢的步骤都提到了 Spark Analyzer 中的 PlanCheckLogger。还可以访问 Spark 的 RuleExecutor 以查看每个规则使用了多少时间以及哪些规则无效:
spark_session.sparkContext.setLogLevel('ALL')
rule_executor = spark_session._jvm.org.apache.spark.sql.catalyst.rules.RuleExecutor
rule_executor.resetMetrics()
spark_df = spark_session.sql(query_sql) # logs 10,000+ lines of output, lines with keyword `PlanChangeLogger` give timestamps showing the slow steps are in the Analyzer, but not the order of steps that occur
print(rule_executor.dumpTimeSpent()) # prints Analyzer rules that ran, how much time was 'effective' for each rule, but no specifics on order of rules run, no details on why rules took up a lot of time but were not effective.
下一步:尝试(未成功)访问 Spark 的 QueryPlanningTracker 数据以深入了解规则运行的时间线、每次调用每个规则所用的时间以及我可以获得的任何其他细节:
tracker = spark_session._jvm.org.apache.spark.sql.catalyst.QueryPlanningTracker
## Use some call here to show data contents of the tracker; which currently gives E.g. intitial exploration:
tracker.measurePhase.topRulesByTime(10)
*** TypeError: 'JavaPackage' object is not callable ....
以上是一个例子;跟踪器代码表明它具有我可以使用的其他方法和字段,但是我看不到如何访问这些方法和字段,也看不到如何从 Python 中检查哪些方法和字段可用,因此读取 Spark 的 github 存储库只是试错...
【问题讨论】:
标签: apache-spark pyspark apache-spark-sql profiling