这是《漫谈分布式系统》系列的第 22 篇,预计会写 30 篇左右。扫描文末二维码,关注公众号,听我娓娓道来。也欢迎转发朋友圈分享给更多人。  

经验的局限性

前面两篇文章,我们介绍了大数据下的 RBO 和 CBO 这两种 SQL 查询性能优化手段。

RBO 忠于既定规则,而 CBO 更关心实际代价,从根本上来说,是两种思路完全不同的优化方式。

虽然各有侧重点,但也有共性。

CBO 虽说基于代价,但也是在几个备选计划中,再去挑选代价最小的。而备选计划也不是随机来的,通常也是基于经验挑选的。

比如常被拿来做例子的 join order,就是在以往经验中,发现 join 的顺序不同,对性能有很大影响,才做成不同的备选计划去接受评估。

从这个角度讲,广义地看,RBO 和 CBO 都是基于经验的

只不过 CBO 给这些经验加入了一些变量,也就是各种对实际数据的统计指标。

然而,上一篇文章我们已经说过,这些统计指标是有缺点的:

  • 统计指标的计算是需要消耗成本的,导致指标可能未及时更新,甚至根本没有指标,

  • 有些指标需要估算,可能不准,比如 filter selectivity 是近似估算,

  • 统计指标可能不能保证及时更新,比如数据刚更新了就被使用,而统计指标还没算完。

这类问题可以概括为统计指标的不准确性

统计指标的引入,使得优化措施更准更灵活,但上述缺点又使得统计指标不够准、不够灵活。

再加上另一个普遍存在的问题:阈值类的设置,往往不具有普适性,一些情况下反倒会拖累性能。比如 spark.sql.shuffle.partitions,默认值是 200,但很显然数据量不同时,合适的 partition 数是不一样的。

静态配置的存在,也使得优化手段的选择不够准、不够灵活。

经验是死的,我们需要更加准、更加灵活的优化方式。

Spark AQE 的尝试

要想更准,就要及时、全覆盖的统计数据;要想更灵活,就要有相对动态的阈值设置。

对这两个问题,Spark 在 3.0 版本引入的 Adaptive Query Execution 技术,示范了很好的思路。

  • 第一个问题,不再依赖实现 analyze 得到的不及时和不准确的统计数据,而是在运行时分析数据。

  • 第二个问题,不再采用直接写死的静态配置,Spark 使用相对间接的配置来评估。

下面结合 Spark AQE 当前主要支持的三种优化来看下。

动态合并 shuffle 分区

前面已经提到,在 Spark 里, shuffle 分区数,是通过静态配置设置的。这也就必然导致分区数的不普适。

漫谈分布式系统(22) -- 自适应的优化

比如上面这个例子,很显然 2 3 4 这三个分区都很小了,如果能合并就好了。

漫谈分布式系统(22) -- 自适应的优化

要想达到这样的效果,就需要更加间接和灵活的设置,而不是直接写死分区数。

想想,我们刚才是怎么判断 2 3 4 那几个分区偏小的?是根据分区数据量。

所以,可以把控制分区数的设置参数,从直接设置分区数,改为设置每个分区的数据量大小。

当然,这个分区数量的大小依然是个经验值,会受到经验的局限性。但很显然,效果比之前直接设置要准确和灵活多了。

而设置分区大小后,就有两种可能:

  • 太小的分区需要合并

  • 太大的分区需要拆分。

合并好说,拆分就可能导致 shuffle,拖累性能。所以 Spark AQE 采用了统一先设置一个较大的分区数,再合并小分区的策略。

可以看到,这个优化解决的是静态配置不灵活的问题。

动态调整 join 策略

这个系列前面有篇文章,专门讲过不同 join 策略对性能的影响。

其中,broadcast join 是 Spark 效率最高的 join。但由于需要把数据放在内存,所以适用于小表。

多小算小,Spark 通过 spark.sql.autoBroadcastJoinThreshold 参数来控制这个阈值。

但这个参数考察的是源表的大小,一旦碰到 filter 等操作,过滤之后的数据可能就远小于源表大小了,就算估算也可能卡在边界值附近。

漫谈分布式系统(22) -- 自适应的优化

如上图这样,AQE 就能在运行时去评估数据大小,然后把 sort merge join 优化为 broadcast join。

可以看到,这个优化解决的是估算不准确的问题。

动态优化倾斜 join

数据倾斜是很常见的优化场景,往往也需要耗费大量的时间。但其实是有通用的解决办法的,适合自动去做。

比如下面这个场景:

漫谈分布式系统(22) -- 自适应的优化

A0 这个分区由于数据倾斜,会显著影响整体执行时间。常见的优化是拆分之后再和对应分区 join,然后再合并结果。

漫谈分布式系统(22) -- 自适应的优化

刚才说过,倾斜的解决办法很通用,在代码层面也很好处理。

问题在于,怎么样算倾斜?像分区数那样做成静态配置?显然不妥。

AQE 采用的思路是,绝对值不灵活,就用相对值。如果有个分区的大小超过其他分区的 spark.sql.adaptive.skewJoin.skewedPartitionFactor 倍,默认是 5 倍,就认为它倾斜了,需要拆分。

可以看到,这个优化除了提供解决数据倾斜的通用办法外,也是在解决静态配置不普适的问题。


从这三个例子,我们就能看到 Spark AQE 的思路。

  • 通过运行时统计数据的采集,解决不及时和不准确的问题,

  • 通过间接和相对值类型的阈值设置,解决直接静态配置不普适的问题。

而 Spark 按照 DAG 划分 stage 调度的方式,使得在前一个 stage 结束后,能拿到完整的统计信息,然后就能重新去做即时优化。这样,AQE 就能在不影响整体执行框架的情况下,在恰当的位置介入。

作为对 RBO 和 CBO 的补充和改进,AQE 的思路是非常值得我们思考和借鉴的,期待在 Spark 中有更多的应用,也希望其他计算引擎能参考。

关联阅读

漫谈分布式系统(20) -- 基于规则的优化

漫谈分布式系统(21) -- 基于代价的优化

原创不易

关注/分享/赞赏

给我坚持的动力

漫谈分布式系统(22) -- 自适应的优化

相关文章:

  • 2021-04-10
  • 2021-12-13
  • 2022-02-07
  • 2021-06-27
  • 2021-07-24
  • 2021-08-19
  • 2021-10-21
  • 2022-12-23
猜你喜欢
  • 2021-11-14
  • 2021-11-17
  • 2021-04-16
  • 2021-07-04
  • 2021-12-03
  • 2022-12-23
  • 2021-08-17
相关资源
相似解决方案