【问题标题】:Spark sql query causing huge data shuffle read / writeSpark sql查询导致巨大的数据洗牌读/写
【发布时间】:2018-11-30 17:00:20
【问题描述】:

我正在使用 spark sql 来处理数据。这是查询

select 
    /*+ BROADCAST (C) */ A.party_id, 
    IF(B.master_id is NOT NULL, B.master_id, 'MISSING_LINK') as master_id, 
    B.is_matched, 
    D.partner_name, 
    A.partner_id, 
    A.event_time_utc, 
    A.funnel_stage_type, 
    A.product_id_set, 
    A.ip_address, 
    A.session_id, 
    A.tdm_retailer_id, 
    C.product_name ,
    CASE WHEN C.product_category_lvl_01 is NULL THEN 'OUTOFSALE' ELSE product_category_lvl_01 END as product_category_lvl_01,
    CASE WHEN C.product_category_lvl_02 is NULL THEN 'OUTOFSALE' ELSE product_category_lvl_02 END as product_category_lvl_02,
    CASE WHEN C.product_category_lvl_03 is NULL THEN 'OUTOFSALE' ELSE product_category_lvl_03 END as product_category_lvl_03,
    CASE WHEN C.product_category_lvl_04 is NULL THEN 'OUTOFSALE' ELSE product_category_lvl_04 END as product_category_lvl_04, 
    C.brand_name 
from 
    browser_data A 
    INNER JOIN (select partner_name, partner_alias_tdm_id as npa_retailer_id from npa_retailer) D  
        ON (A.tdm_retailer_id = D.npa_retailer_id) 
    LEFT JOIN 
        (identity as B1 INNER JOIN (select random_val from random_distribution) B2) as B 
        ON (A.party_id = B.party_id and A.random_val = B.random_val) 
    LEFT JOIN product_taxonomy as C 
        ON (A.product_id = C.product_id and D.npa_retailer_id = C.retailer_id)

在哪里, browser_data A - 大约 110 GB 的数据,包含 5.19 亿条记录,

D - 仅映射到 A 中的一个值的小型数据集。由于这是小型 spark sql 自动广播它(在解释中的执行计划中确认)

B - 5 GB 的 4500 万条记录仅包含 3 列。该数据集被复制 30 次(使用包含 0 到 29 个值的数据集的笛卡尔积完成),从而解决了倾斜键(大量数据与数据集 A 中的一个相对)问题。这将产生 150 GB 的数据。

C - 900 MB,包含 900 万条记录。这是通过广播加入 A 加入的(所以没有随机播放)

以上查询效果很好。但是当我看到 spark UI 时,我可以观察到上面的查询触发了 6.8 TB 的随机读取。由于数据集 D 和 C 作为广播加入,因此不会导致任何洗牌。所以只有 A 和 B 的连接应该引起洗牌。即使我们认为所有数据都是随机读取的,那么它也应该限制在 110 GB (A) + 150 GB (B) = 260 GB。为什么它会触发 6.8 TB 的随机读取和 40 GB 的随机写入。 任何帮助表示赞赏。提前谢谢你

谢谢

马尼什

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql pyspark-sql


    【解决方案1】:

    我要做的第一件事是在上面使用DataFrame.explain。这将向您显示执行计划,以便您可以准确了解实际发生的情况。我会检查输出以确认广播连接确实发生了。 Spark 有一个设置来控制你的数据在放弃广播连接之前可以有多大。

    我还要注意,您针对 random_distribution 的 INNER JOIN 看起来很可疑。我可能错误地重新创建了您的架构,但是当我解释时我得到了这个:

    scala> spark.sql(sql).explain
    == Physical Plan ==
    org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
    LocalRelation [party_id#99]
    and
    LocalRelation [random_val#117]
    Join condition is missing or trivial.
    Use the CROSS JOIN syntax to allow cartesian products between these relations.;
    

    最后,您的输入数据是否经过压缩?您可能会看到大小差异,因为您的数据不再被压缩,并且因为它被序列化的方式。

    【讨论】:

    • 感谢您的评论。我确认数据集 C 和 D 正在发生广播连接。我正在使用使用 snappy 压缩器的 AVRO 格式数据。
    猜你喜欢
    • 2016-07-24
    • 2016-04-29
    • 2015-07-25
    • 1970-01-01
    • 2017-08-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多