【问题标题】:Spark SQL physical plan doesn't reuse exchangeSpark SQL 物理计划不重用交换
【发布时间】:2020-09-28 15:28:21
【问题描述】:

我正在尝试为以下转换优化物理计划。

  1. 从“pad”和“pi”读取数据
  2. 在“pad”中查找在“pi”中有引用的行并转换一些列。
  3. 在 'pi' 中查找 'pad' 中没有引用的行并转换某些列。
  4. 合并 2 和 3 的行。
val pad_in_pi = pad
  .join(
    pi
    , $"pad.ReferenceKeyCode" === $"pi.PurchaseInvoiceKeyCode"
    , "inner"
  )
  .selectExpr(
    "pad.AccountingDocumentKeyCode"
    , "pad.RegionId"
    , "pi.PurchaseInvoiceLineNumber as DocumentLineNumber"
    , "pi.CodingBlockSequentialNumber"
  )
  
val pad_not_in_pi = pad
  .join(
    pi
    , $"pad.ReferenceKeyCode" === $"pi.PurchaseInvoiceKeyCode"
    , "anti"
  )
  .selectExpr(
    "pad.AccountingDocumentKeyCode"
    , "pad.RegionId"
    , "pad.AccountingDocumentLineNumber as DocumentLineNumber"
    , "0001 as CodingBlockSequentialNumber"
  )
  
  pad_in_pi.union(pad_not_in_pi)

分支 2 和 3 使用相同的连接表达式,因此可以重用交换。当前的物理计划没有。可能是什么原因?

== Physical Plan ==
Union
:- *(3) Project [AccountingDocumentKeyCode#491, RegionId#539, PurchaseInvoiceLineNumber#205 AS DocumentLineNumber#954, CodingBlockSequentialNumber#203]
:  +- *(3) SortMergeJoin [ReferenceKeyCode#538], [PurchaseInvoiceKeyCode#235], Inner
:     :- Sort [ReferenceKeyCode#538 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(ReferenceKeyCode#538, 200), true, [id=#684]
:     :     +- *(1) Project [AccountingDocumentKeyCode#491, ReferenceKeyCode#538, RegionId#539]
:     :        +- *(1) Filter ((isnotnull(RegionId#539) AND (RegionId#539 = R)) AND isnotnull(ReferenceKeyCode#538))
:     :           +- *(1) ColumnarToRow
:     :              +- FileScan parquet default.purchaseaccountingdocument_delta[AccountingDocumentKeyCode#491,ReferenceKeyCode#538,RegionId#539] Batched: true, DataFilters: [isnotnull(RegionId#539), (RegionId#539 = R), isnotnull(ReferenceKeyCode#538)], Format: Parquet, Location: PreparedDeltaFileIndex[dbfs:..., PartitionFilters: [], PushedFilters: [IsNotNull(RegionId), EqualTo(RegionId,R), IsNotNull(ReferenceKeyCode)], ReadSchema: struct<AccountingDocumentKeyCode:string,ReferenceKeyCode:string,RegionId:string>
:     +- Sort [PurchaseInvoiceKeyCode#235 ASC NULLS FIRST], false, 0
:        +- Exchange hashpartitioning(PurchaseInvoiceKeyCode#235, 200), true, [id=#692]
:           +- *(2) Project [CodingBlockSequentialNumber#203, PurchaseInvoiceLineNumber#205, PurchaseInvoiceKeyCode#235]
:              +- *(2) Filter ((isnotnull(RegionId#207) AND (RegionId#207 = R)) AND isnotnull(PurchaseInvoiceKeyCode#235))
:                 +- *(2) ColumnarToRow
:                    +- FileScan parquet default.purchaseinvoice_delta[CodingBlockSequentialNumber#203,PurchaseInvoiceLineNumber#205,RegionID#207,PurchaseInvoiceKeyCode#235] Batched: true, DataFilters: [isnotnull(RegionID#207), (RegionID#207 = R), isnotnull(PurchaseInvoiceKeyCode#235)], Format: Parquet, Location: PreparedDeltaFileIndex[dbfs:..., PartitionFilters: [], PushedFilters: [IsNotNull(RegionID), EqualTo(RegionID,R), IsNotNull(PurchaseInvoiceKeyCode)], ReadSchema: struct<CodingBlockSequentialNumber:string,PurchaseInvoiceLineNumber:string,RegionID:string,Purcha...
+- *(6) Project [AccountingDocumentKeyCode#491, RegionId#539, AccountingDocumentLineNumber#492 AS DocumentLineNumber#1208, 1 AS CodingBlockSequentialNumber#1210]
   +- SortMergeJoin [ReferenceKeyCode#538], [PurchaseInvoiceKeyCode#235], LeftAnti
      :- Sort [ReferenceKeyCode#538 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(ReferenceKeyCode#538, 200), true, [id=#703]
      :     +- *(4) Project [AccountingDocumentKeyCode#491, AccountingDocumentLineNumber#492, ReferenceKeyCode#538, RegionId#539]
      :        +- *(4) Filter (isnotnull(RegionId#539) AND (RegionId#539 = R))
      :           +- *(4) ColumnarToRow
      :              +- FileScan parquet default.purchaseaccountingdocument_delta[AccountingDocumentKeyCode#491,AccountingDocumentLineNumber#492,ReferenceKeyCode#538,RegionId#539] Batched: true, DataFilters: [isnotnull(RegionId#539), (RegionId#539 = R)], Format: Parquet, Location: PreparedDeltaFileIndex[dbfs:..., PartitionFilters: [], PushedFilters: [IsNotNull(RegionId), EqualTo(RegionId,R)], ReadSchema: struct<AccountingDocumentKeyCode:string,AccountingDocumentLineNumber:string,ReferenceKeyCode:stri...
      +- Sort [PurchaseInvoiceKeyCode#235 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(PurchaseInvoiceKeyCode#235, 200), true, [id=#710]
            +- *(5) Project [PurchaseInvoiceKeyCode#235]
               +- *(5) Filter ((isnotnull(RegionId#207) AND (RegionId#207 = R)) AND isnotnull(PurchaseInvoiceKeyCode#235))
                  +- *(5) ColumnarToRow
                     +- FileScan parquet default.purchaseinvoice_delta[RegionID#207,PurchaseInvoiceKeyCode#235] Batched: true, DataFilters: [isnotnull(RegionID#207), (RegionID#207 = R), isnotnull(PurchaseInvoiceKeyCode#235)], Format: Parquet, Location: PreparedDeltaFileIndex[dbfs:..., PartitionFilters: [], PushedFilters: [IsNotNull(RegionID), EqualTo(RegionID,R), IsNotNull(PurchaseInvoiceKeyCode)], ReadSchema: struct<RegionID:string,PurchaseInvoiceKeyCode:string>

【问题讨论】:

  • Filter 运算符或不相同,您的内部连接中有一个额外的isnotnull(ReferenceKeyCode#538))。也许您可以调整您的查询以使它们相同?
  • @RaphaelRoth 不错!该计划在使FilterProject 运算符相同后重用交换。

标签: apache-spark


【解决方案1】:

不直接回答交换重用,而是尝试左外连接以摆脱联合:

pad.join(
    pi
    , $"pad.ReferenceKeyCode" === $"pi.PurchaseInvoiceKeyCode"
    , "left_outer"
  )
  .selectExpr(
    "pad.AccountingDocumentKeyCode"
    , "pad.RegionId"
    , "coalesce(pi.PurchaseInvoiceLineNumber, pad.AccountingDocumentLineNumber) as DocumentLineNumber"
    , "coalesce(pi.CodingBlockSequentialNumber, '0001') as CodingBlockSequentialNumber"
  )

【讨论】:

  • 我会使用 aLEFT OUTER JOIN,但是两个分支上的转换还需要加入两个不同的数据集,每个分支一个,然后将它们重新合并在一起。我想我应该在问题中添加这个细节。有没有更好的编码方式?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-04-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-09-27
  • 2021-07-04
  • 1970-01-01
相关资源
最近更新 更多