【发布时间】:2020-09-28 15:28:21
【问题描述】:
我正在尝试为以下转换优化物理计划。
- 从“pad”和“pi”读取数据
- 在“pad”中查找在“pi”中有引用的行并转换一些列。
- 在 'pi' 中查找 'pad' 中没有引用的行并转换某些列。
- 合并 2 和 3 的行。
val pad_in_pi = pad
.join(
pi
, $"pad.ReferenceKeyCode" === $"pi.PurchaseInvoiceKeyCode"
, "inner"
)
.selectExpr(
"pad.AccountingDocumentKeyCode"
, "pad.RegionId"
, "pi.PurchaseInvoiceLineNumber as DocumentLineNumber"
, "pi.CodingBlockSequentialNumber"
)
val pad_not_in_pi = pad
.join(
pi
, $"pad.ReferenceKeyCode" === $"pi.PurchaseInvoiceKeyCode"
, "anti"
)
.selectExpr(
"pad.AccountingDocumentKeyCode"
, "pad.RegionId"
, "pad.AccountingDocumentLineNumber as DocumentLineNumber"
, "0001 as CodingBlockSequentialNumber"
)
pad_in_pi.union(pad_not_in_pi)
分支 2 和 3 使用相同的连接表达式,因此可以重用交换。当前的物理计划没有。可能是什么原因?
== Physical Plan ==
Union
:- *(3) Project [AccountingDocumentKeyCode#491, RegionId#539, PurchaseInvoiceLineNumber#205 AS DocumentLineNumber#954, CodingBlockSequentialNumber#203]
: +- *(3) SortMergeJoin [ReferenceKeyCode#538], [PurchaseInvoiceKeyCode#235], Inner
: :- Sort [ReferenceKeyCode#538 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(ReferenceKeyCode#538, 200), true, [id=#684]
: : +- *(1) Project [AccountingDocumentKeyCode#491, ReferenceKeyCode#538, RegionId#539]
: : +- *(1) Filter ((isnotnull(RegionId#539) AND (RegionId#539 = R)) AND isnotnull(ReferenceKeyCode#538))
: : +- *(1) ColumnarToRow
: : +- FileScan parquet default.purchaseaccountingdocument_delta[AccountingDocumentKeyCode#491,ReferenceKeyCode#538,RegionId#539] Batched: true, DataFilters: [isnotnull(RegionId#539), (RegionId#539 = R), isnotnull(ReferenceKeyCode#538)], Format: Parquet, Location: PreparedDeltaFileIndex[dbfs:..., PartitionFilters: [], PushedFilters: [IsNotNull(RegionId), EqualTo(RegionId,R), IsNotNull(ReferenceKeyCode)], ReadSchema: struct<AccountingDocumentKeyCode:string,ReferenceKeyCode:string,RegionId:string>
: +- Sort [PurchaseInvoiceKeyCode#235 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(PurchaseInvoiceKeyCode#235, 200), true, [id=#692]
: +- *(2) Project [CodingBlockSequentialNumber#203, PurchaseInvoiceLineNumber#205, PurchaseInvoiceKeyCode#235]
: +- *(2) Filter ((isnotnull(RegionId#207) AND (RegionId#207 = R)) AND isnotnull(PurchaseInvoiceKeyCode#235))
: +- *(2) ColumnarToRow
: +- FileScan parquet default.purchaseinvoice_delta[CodingBlockSequentialNumber#203,PurchaseInvoiceLineNumber#205,RegionID#207,PurchaseInvoiceKeyCode#235] Batched: true, DataFilters: [isnotnull(RegionID#207), (RegionID#207 = R), isnotnull(PurchaseInvoiceKeyCode#235)], Format: Parquet, Location: PreparedDeltaFileIndex[dbfs:..., PartitionFilters: [], PushedFilters: [IsNotNull(RegionID), EqualTo(RegionID,R), IsNotNull(PurchaseInvoiceKeyCode)], ReadSchema: struct<CodingBlockSequentialNumber:string,PurchaseInvoiceLineNumber:string,RegionID:string,Purcha...
+- *(6) Project [AccountingDocumentKeyCode#491, RegionId#539, AccountingDocumentLineNumber#492 AS DocumentLineNumber#1208, 1 AS CodingBlockSequentialNumber#1210]
+- SortMergeJoin [ReferenceKeyCode#538], [PurchaseInvoiceKeyCode#235], LeftAnti
:- Sort [ReferenceKeyCode#538 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(ReferenceKeyCode#538, 200), true, [id=#703]
: +- *(4) Project [AccountingDocumentKeyCode#491, AccountingDocumentLineNumber#492, ReferenceKeyCode#538, RegionId#539]
: +- *(4) Filter (isnotnull(RegionId#539) AND (RegionId#539 = R))
: +- *(4) ColumnarToRow
: +- FileScan parquet default.purchaseaccountingdocument_delta[AccountingDocumentKeyCode#491,AccountingDocumentLineNumber#492,ReferenceKeyCode#538,RegionId#539] Batched: true, DataFilters: [isnotnull(RegionId#539), (RegionId#539 = R)], Format: Parquet, Location: PreparedDeltaFileIndex[dbfs:..., PartitionFilters: [], PushedFilters: [IsNotNull(RegionId), EqualTo(RegionId,R)], ReadSchema: struct<AccountingDocumentKeyCode:string,AccountingDocumentLineNumber:string,ReferenceKeyCode:stri...
+- Sort [PurchaseInvoiceKeyCode#235 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(PurchaseInvoiceKeyCode#235, 200), true, [id=#710]
+- *(5) Project [PurchaseInvoiceKeyCode#235]
+- *(5) Filter ((isnotnull(RegionId#207) AND (RegionId#207 = R)) AND isnotnull(PurchaseInvoiceKeyCode#235))
+- *(5) ColumnarToRow
+- FileScan parquet default.purchaseinvoice_delta[RegionID#207,PurchaseInvoiceKeyCode#235] Batched: true, DataFilters: [isnotnull(RegionID#207), (RegionID#207 = R), isnotnull(PurchaseInvoiceKeyCode#235)], Format: Parquet, Location: PreparedDeltaFileIndex[dbfs:..., PartitionFilters: [], PushedFilters: [IsNotNull(RegionID), EqualTo(RegionID,R), IsNotNull(PurchaseInvoiceKeyCode)], ReadSchema: struct<RegionID:string,PurchaseInvoiceKeyCode:string>
【问题讨论】:
-
Filter运算符或不相同,您的内部连接中有一个额外的isnotnull(ReferenceKeyCode#538))。也许您可以调整您的查询以使它们相同? -
@RaphaelRoth 不错!该计划在使
Filter和Project运算符相同后重用交换。
标签: apache-spark