【发布时间】:2018-11-30 17:00:20
【问题描述】:
我正在使用 spark sql 来处理数据。这是查询
select
/*+ BROADCAST (C) */ A.party_id,
IF(B.master_id is NOT NULL, B.master_id, 'MISSING_LINK') as master_id,
B.is_matched,
D.partner_name,
A.partner_id,
A.event_time_utc,
A.funnel_stage_type,
A.product_id_set,
A.ip_address,
A.session_id,
A.tdm_retailer_id,
C.product_name ,
CASE WHEN C.product_category_lvl_01 is NULL THEN 'OUTOFSALE' ELSE product_category_lvl_01 END as product_category_lvl_01,
CASE WHEN C.product_category_lvl_02 is NULL THEN 'OUTOFSALE' ELSE product_category_lvl_02 END as product_category_lvl_02,
CASE WHEN C.product_category_lvl_03 is NULL THEN 'OUTOFSALE' ELSE product_category_lvl_03 END as product_category_lvl_03,
CASE WHEN C.product_category_lvl_04 is NULL THEN 'OUTOFSALE' ELSE product_category_lvl_04 END as product_category_lvl_04,
C.brand_name
from
browser_data A
INNER JOIN (select partner_name, partner_alias_tdm_id as npa_retailer_id from npa_retailer) D
ON (A.tdm_retailer_id = D.npa_retailer_id)
LEFT JOIN
(identity as B1 INNER JOIN (select random_val from random_distribution) B2) as B
ON (A.party_id = B.party_id and A.random_val = B.random_val)
LEFT JOIN product_taxonomy as C
ON (A.product_id = C.product_id and D.npa_retailer_id = C.retailer_id)
在哪里, browser_data A - 大约 110 GB 的数据,包含 5.19 亿条记录,
D - 仅映射到 A 中的一个值的小型数据集。由于这是小型 spark sql 自动广播它(在解释中的执行计划中确认)
B - 5 GB 的 4500 万条记录仅包含 3 列。该数据集被复制 30 次(使用包含 0 到 29 个值的数据集的笛卡尔积完成),从而解决了倾斜键(大量数据与数据集 A 中的一个相对)问题。这将产生 150 GB 的数据。
C - 900 MB,包含 900 万条记录。这是通过广播加入 A 加入的(所以没有随机播放)
以上查询效果很好。但是当我看到 spark UI 时,我可以观察到上面的查询触发了 6.8 TB 的随机读取。由于数据集 D 和 C 作为广播加入,因此不会导致任何洗牌。所以只有 A 和 B 的连接应该引起洗牌。即使我们认为所有数据都是随机读取的,那么它也应该限制在 110 GB (A) + 150 GB (B) = 260 GB。为什么它会触发 6.8 TB 的随机读取和 40 GB 的随机写入。 任何帮助表示赞赏。提前谢谢你
谢谢
马尼什
【问题讨论】:
标签: apache-spark pyspark apache-spark-sql pyspark-sql