【问题标题】:Does pyspark changes order of instructions for optimization?pyspark 是否会更改优化指令的顺序?
【发布时间】:2020-03-30 09:31:12
【问题描述】:

假设我有以下管道:

df.orderBy('foo').limit(10).show()

这里我们可以看到orderBy指令在前,所以数据帧的所有行都应该在limit指令执行之前排序。我发现自己在思考 Spark 是否在管道内进行了一些“重组”以提高性能(例如,在 orderBy 之前执行limit 指令before)。 spark会这样做吗?

【问题讨论】:

    标签: python-3.x apache-spark pyspark


    【解决方案1】:

    你的假设是正确的。 Spark 在合并/收集结果之前在每个分区上执行sort,然后执行limit,我们将在接下来看到。

    orderBy 后跟 limit 将导致下一次调用:

    通过查看TakeOrderedAndProjectExec:doExecute() 方法,我们将首先遇到下一个代码:

    protected override def doExecute(): RDD[InternalRow] = {
        val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
        val localTopK: RDD[InternalRow] = {
          child.execute().map(_.copy()).mapPartitions { iter =>
            org.apache.spark.util.collection.Utils.takeOrdered(iter, limit)(ord)
          }
        }
    
    ......
    
    

    在这里我们可以看到localTopK 是通过从每个排序分区获取前K 个第一条记录 填充的。这意味着 Spark 会尝试尽快在分区级别下推 topK 过滤器。

    下一行:

    ....
    
    val shuffled = new ShuffledRowRDD(
          ShuffleExchangeExec.prepareShuffleDependency(
            localTopK,
            child.output,
            SinglePartition,
            serializer,
            writeMetrics),
          readMetrics)
        shuffled.mapPartitions { iter =>
          val topK = org.apache.spark.util.collection.Utils.takeOrdered(iter.map(_.copy()), limit)(ord)
          if (projectList != child.output) {
            val proj = UnsafeProjection.create(projectList, child.output)
            topK.map(r => proj(r))
          } else {
            topK
          }
        }
    

    将从所有分区中生成最终的ShuffledRowRDD,其中将包含构成limit 的最终结果的最终topK 排序记录。

    示例

    让我们通过一个例子来说明这一点。考虑具有1,2,3...20 范围的数据集,该数据集分为两部分。第一个包含奇数,第二个包含偶数,如下所示:

    -----------   -----------
    |   P1    |   |   P2    | 
    -----------   -----------
    |   1     |   |   2     |
    |   3     |   |   4     |
    |   5     |   |   6     |
    |   7     |   |   8     |
    |   9     |   |   10    |
    |  ....   |   |  ....   |
    |   19    |   |   20    |
    -----------   -----------
    

    df.orderBy(...).limit(5) 被执行时,Spark 将从每个分区中获得前 5 个排序记录,即第一个分区为 1-9,第二个分区为 2-10。然后它将合并并排序它们,即序列1,2,3,4,5..10。最后它将获得生成最终列表1,2,3,4,5 的前 5 条记录。

    结论

    Spark 通过省略处理整个数据集但只处理前前 K 行来利用 orderBy 后跟 limit 的所有可用信息。正如@ShemTov 已经提到的,没有必要在orderBy 之前调用limit,因为第一个会返回一个无效的数据集,第二个是因为Spark 会在内部为您进行所有必要的优化。

    【讨论】:

      【解决方案2】:

      Spark 会在需要时进行优化,但在您的情况下,它无法在 orderBy 之前进行限制,因为您会得到不正确的结果。

      这段代码意味着我希望 spark 对 foo 列上的所有行进行排序,然后给我前 10 个。

      【讨论】:

        【解决方案3】:

        确实如此,但无论如何它都不会改变结果。这就是我们称之为优化的原因。

        Spark 为我们提供了两种操作来解决任何问题。

        当我们对任何 RDD 进行转换时,它会给我们一个新的 RDD。但它不会开始执行这些转换。仅当对新的 RDD 执行操作并为我们提供最终结果时才会执行执行。

        因此,一旦您对 RDD 执行任何操作,Spark 上下文就会将您的程序提供给驱动程序。

        驱动程序为您的程序创建 DAG(有向无环图)或执行计划(作业)。一旦创建了 DAG,驱动程序就会将此 DAG 划分为多个阶段。然后将这些阶段划分为更小的任务,并将所有任务交给执行者执行。

        Spark 驱动程序负责将用户程序转换为称为任务的物理执行单元。在高层次上,所有 Spark 程序都遵循相同的结构。他们从一些输入创建 RDD,从那些使用转换的 RDD 派生新的 RDD,并执行操作以收集或保存数据。 Spark 程序隐式地创建操作的逻辑有向无环图 (DAG)。

        驱动程序运行时,会将这个逻辑图转换为物理执行计划。

        【讨论】:

          【解决方案4】:

          这里有两个不同的概念来解释智能查询执行系统是如何工作的,无论是 Spark 还是 RDBMS。

          1.证明查询/执行的准确结果

          一个查询/执行被解析成一个 DAG,代表不同的执行步骤和它们之间的依赖关系。这些步骤可以表示为MapReduce 类型的步骤。每个独立的步骤都是一个“阶段”,两个阶段由一个随机边界分隔。

          可以免费打破阶段之间的这些依赖关系,它们将连续运行(在给定的执行程序中)。

          在我的这篇文章中,我解释了 spark 如何按照提供的顺序执行操作以提供正确的结果 - Spark withColumn and where execution order

          2.快速提供结果

          在一个阶段内,根据 DAG 的定义方式,某些步骤可以并行化。这就是你看到 Spark 会使用许多机制优化执行计划的地方,比如懒惰、先一步运行、催化剂、编码、全阶段代码生成、使用统计信息、谓词下推、列访问、缓存等。新技术是随着事情的发展而添加。这就是 Spark 击败 Hadoop 的地方。在 Hadoop 中,您需要自己编写所有优化,但 Spark 会在后台处理它。同样的 RDBM 也可以工作。如果需要,我可以解释每种技术。

          要处理的数据分散在许多执行器中,这些执行器在不同的执行器上运行相同的“阶段”。这称为可扩展性。随着集群大小的增加(对于大型数据集),作业会运行得更快。此行为与 Hadoop 相同。开发人员在某种程度上仍然负责以某种方式编码,以确保实现最大的并行性。

          让我们看看你的例子 如果orderBy 没有第一次发生,limit 无法提供准确的结果。所以它将按照orderBy 然后limit 的顺序执行。它永远不会重新安排这个执行顺序。

          val df = spark.createDataset(List(("a","b","c"),("a1","b1","c1"),......).toDF("guitarid","make","model")
              df.cache()//without this I was not getting the full plan.
              val df1 = df.orderBy("make").limit(1)
              df1.show(false)
              df1.explain(true)
          

          计划如下。逻辑计划建议执行顺序。物理计划使用特殊阶段“TakeOrderedAndProject”优化了执行。

          == Analyzed Logical Plan ==
          guitarid: string, make: string, model: string
          GlobalLimit 1
          +- LocalLimit 1
             +- Sort [make#8 ASC NULLS FIRST], true
                +- Project [_1#3 AS guitarid#7, _2#4 AS make#8, _3#5 AS model#9]
                   +- LocalRelation [_1#3, _2#4, _3#5]
          
          == Optimized Logical Plan ==
          GlobalLimit 1
          +- LocalLimit 1
             +- Sort [make#8 ASC NULLS FIRST], true
                +- InMemoryRelation [guitarid#7, make#8, model#9], StorageLevel(disk, memory, deserialized, 1 replicas)
                      +- LocalTableScan [guitarid#7, make#8, model#9]
          
          == Physical Plan ==
          TakeOrderedAndProject(limit=1, orderBy=[make#8 ASC NULLS FIRST], output=[guitarid#7,make#8,model#9])
          +- InMemoryTableScan [guitarid#7, make#8, model#9]
                +- InMemoryRelation [guitarid#7, make#8, model#9], StorageLevel(disk, memory, deserialized, 1 replicas)
                      +- LocalTableScan [guitarid#7, make#8, model#9]
          

          如果我们在orderBy 之前调用limit,那么它会保持相同的顺序 - 限制第一个然后排序以确保结果符合您的预期。它不会给出错误的性能结果

              val df1 = df.limit(1).orderBy("make")
              df1.show(false)
              df1.explain(true)
          
          == Analyzed Logical Plan ==
          guitarid: string, make: string, model: string
          Sort [make#8 ASC NULLS FIRST], true
          +- GlobalLimit 1
             +- LocalLimit 1
                +- Project [_1#3 AS guitarid#7, _2#4 AS make#8, _3#5 AS model#9]
                   +- LocalRelation [_1#3, _2#4, _3#5]
          
          == Optimized Logical Plan ==
          Sort [make#8 ASC NULLS FIRST], true
          +- GlobalLimit 1
             +- LocalLimit 1
                +- InMemoryRelation [guitarid#7, make#8, model#9], StorageLevel(disk, memory, deserialized, 1 replicas)
                      +- LocalTableScan [guitarid#7, make#8, model#9]
          
          == Physical Plan ==
          *(2) Sort [make#8 ASC NULLS FIRST], true, 0
          +- *(2) GlobalLimit 1
             +- Exchange SinglePartition
                +- *(1) LocalLimit 1
                   +- InMemoryTableScan [guitarid#7, make#8, model#9]
                         +- InMemoryRelation [guitarid#7, make#8, model#9], StorageLevel(disk, memory, deserialized, 1 replicas)
                               +- LocalTableScan [guitarid#7, make#8, model#9]
          

          另一个例子 - 当你想要加入 2 个数据帧时,Spark 可能会选择 Hashjoin 与 broadcasthashjoin 以获得性能,但最终结果将是相同的。

          另一方面,如果我们有如下代码。由于这两个操作依赖于单独的列,它们可以按任何顺序执行。

          df.withColumn("column10", expression on colum1)
            .withColumn("column11", expression on colum2)
          

          结论 我相信 Spark 的执行引擎会以高效的方式提供准确的结果。性能会随着执行引擎的升级而自动提升,所以只要坚持使用 Spark 的最新语法即可。

          【讨论】:

            【解决方案5】:

            是的! Spark 在执行前对指令进行“基于规则”的优化。 Spark 可以做到这一点,因为所有的转换(.select().orderBy().limit() 等)都是惰性的。

            简而言之,Spark 上下文遵循下一个过程

            • 未解决的逻辑计划:首先,Spark 上下文创建指令而不使用元数据。例如,如果计划中有不存在的列名,则计划不会有问题,因为它没有解决。

            • 逻辑计划:下一步,Spark 用“目录”的数据(例如表名、列名、语义)验证创建的指令

            • 优化的逻辑计划:在这个阶段,指令会因为“催化剂优化器”而改变!

            • 物理计划:在这个最后阶段,我们有了最终指令,这些指令将为 JVM 创建执行代码。

            示例:

            我使用.explain() 来查看最终的物理计划。

            如果我运行此代码:df.orderBy('foo').limit(20).limit(5).explain(),物理计划将是:

            == Physical Plan ==
            TakeOrderedAndProject(limit=5, orderBy=[foo#0L ASC NULLS FIRST], output=[foo#0L])
            +- Scan ExistingRDD[foo#0L]
            

            嗯,有趣..优化后的 Spark 指令删除了.limit(20),因为它没用。 Spark order and project for each partition,以便并行执行此任务。最后,将结果合并并显示最后的前 5 条记录。

            关于您问题中的示例

            在这种情况下:df.orderBy('foo').limit(10).show()

            如果您使用.show() 操作运行此转换(show 中的默认行数为 20),那么 Spark 会将结果限制为 10 条记录( 因为 10 TakeOrderedAndProject 方法)。

            【讨论】:

              猜你喜欢
              • 2011-11-08
              • 2018-09-04
              • 2021-01-16
              • 1970-01-01
              • 2013-08-05
              • 1970-01-01
              • 1970-01-01
              • 2015-11-26
              • 1970-01-01
              相关资源
              最近更新 更多