【问题标题】:Scala - Obtain DAG with stages and tasks without executionScala - 在不执行的情况下获取具有阶段和任务的 DAG
【发布时间】:2020-04-03 15:09:05
【问题描述】:

我正在寻找一种使用RDD获取Scala Spark应用程序的DAG的方法,包括阶段和任务。

我已经尝试过rdd.toDebugString,但它只显示了 RDD 沿袭,而不是我正在寻找的 DAG。

我知道有显示 DAG 的 Web UI,但我想从代码中提取 DAG,就像 explain 函数对数据框所做的那样。

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    以下几点:

    • rdd.toDebugString 仅适用于执行前的 RDD。

    • Execution DAG 是您可以在运行时通过 Spark Web UI 观察 RDD 和 Dataframes 的东西。查看新版本:https://spark.apache.org/docs/3.0.0-preview/web-ui.html

    • 在执行之前您可以为 Dataframes 运行 .explain

    • 来源良好:

    Spark SQL EXPLAIN 运算符提供有关 sql 的详细计划信息 声明而不实际运行它。您可以使用 Spark SQL EXPLAIN 运算符显示 Spark 的实际执行计划 执行引擎将在执行任何查询时生成和使用。 您可以使用此执行计划来优化您的查询。

    Dataframe 的一个简单示例:

    import org.apache.spark.sql.Row     
    val dfsFilename = "/FileStore/tables/sample_text.txt"
    val readFileDF = spark.sparkContext.textFile(dfsFilename)  
    val wordsDF = readFileDF.flatMap(_.split(" ")).toDF
    val wcounts3 = wordsDF.filter(r => (r(0) != "Humpty") || (r(0) != "Dumpty"))
                          .groupBy("Value") // Note the value
                          .count().explain()
    

    您为 Dataframe/Dataset 适当地标记了语句,但不是在 show() 上。

    == Physical Plan ==
    *(2) HashAggregate(keys=[Value#4], functions=[finalmerge_count(merge count#14L) AS count(1)#8L])
    +- Exchange hashpartitioning(Value#4, 200), [id=#61]
       +- *(1) HashAggregate(keys=[Value#4], functions=[partial_count(1) AS count#14L])
          +- *(1) Filter <function1>.apply
             +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#4]
                +- Scan[obj#3]
    

    您的具体问题:不可能,也可能不完全有效,因为几乎不需要考虑优化。

    【讨论】:

    • 不幸的是,我正在查看 RDD 而不是 Dataframe。另外,我想不是从 Web UI 中获取 DAG,而是从代码中获取。我会在我的问题中澄清这一点。
    • 你能接受这个答案吗?很遗憾,答案是没有。没有办法。
    • @MiaTran 你能想出一些方法来从代码中可视化 DAG 吗?
    • @thebluephantom 我们如何从物理计划中进行一些查询优化。你能给我一些例子吗?
    • @AkashPatel 这是一个新问题,但medium.com/datalex/… 您有一个选定的计划,但适应性可能会有所不同。你当然可以重写你的查询来影响优化器
    【解决方案2】:

    除了explain 功能外,您还可以在执行应用程序时查看webUI - 如果您在本地运行,它应该在http://localhost:4040/ 上(如此处的文档所述:https://spark.apache.org/docs/latest/monitoring.html)。它提供作业列表、每个作业的 DAG 可视化、配置等。

    希望这会有所帮助!

    【讨论】:

    • 我正在使用 RDD,所以不能使用 explain 函数。我也在查看 Web UI,但我正在寻找一种从代码中获取 DAG 的方法,类似于 explain 函数
    • 不幸的是,这是不可能的,除了 toDebugString。也就是说,RDD 不是那么可优化的,所以我不确定这一点是否有效。
    • 也许你可以在最后运行一个 .toDF() ,这可能会给出你需要的答案。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-06-30
    • 2019-10-02
    • 1970-01-01
    • 1970-01-01
    • 2018-05-29
    • 1970-01-01
    相关资源
    最近更新 更多