GaussDB(DWS)分布式计算的倾斜优化

数据倾斜问题是分布式架构的重要难题,它破坏了MPP架构中各个节点对等的要求,导致单节点(倾斜节点)所存储或者计算的数据量远大于其他节点,造成以下危害:

  • 存储上的倾斜会严重限制系统容量,在系统容量不饱和的情况下,由于单节点倾斜的限制,使得整个系统容量无法继续增长。
  • 计算上的倾斜会严重影响系统性能,由于倾斜节点所需要运算的数据量远大于其它节点,导致倾斜节点计算,降低作业整体性能。
  • 数据倾斜还严重影响了MPP架构的扩展性。由于在存储或者计算时,往往会将相同值的数据放到同一节点,当出现大量计算倾斜时,系统瓶颈受限于倾斜节点的容量或者性能,扩容无法达到线性或者近似线性的扩招效果

GaussDB(DWS)通过多分布列的形式可以有效的解决存储倾斜的场景,本文主要介绍GaussDB(DWS)针对数据计算倾斜的解决方案和实际效果

1. 预置条件

CREATE TABLE t1(a int, b int) DISTRIBUTE BY HASH(a);
CREATE TABLE t2(a int, b int) DISTRIBUTE BY HASH(a);
INSERT INTO t1 VALUES (generate_series(1, 1000), generate_series(1, 1000));
INSERT INTO t1 SELECT a+1000*2, b+1000*2 FROM t1;
INSERT INTO t1 SELECT a+1000*2*2, b+1000*2*2 FROM t1;
INSERT INTO t1 SELECT a+1000*2*2*2, b+1000*2*2*2 FROM t1;
INSERT INTO t1 SELECT a+1000*2*2*2*2, b+1000*2*2*2*2 FROM t1;
INSERT INTO t1 SELECT a+1000*2*2*2*2*2, b+1000*2*2*2*2*2 FROM t1;
INSERT INTO t1 SELECT a+1000*2*2*2*2*2*2, b+1000*2*2*2*2*2*2 FROM t1;
INSERT INTO t1 SELECT a+1000*2*2*2*2*2*2*2, b+1000*2*2*2*2*2*2*2 FROM t1;
INSERT INTO t2 SELECT a, 1 FROM t1;
ANALYZE t1;
ANALYZE t2;

2. 执行概述

以如下SQL为例,分析普通执行机制和倾斜优化执行机制的差异

SELECT count(1) FROM t1 INNER JOIN t2 ON t1.b = t2.b;

2.1 非倾斜优化执行机制

GaussDB(DWS)默认采取倾斜优化执行机制。在生成执行计划阶段,优化器根据统计信息识别join列上是否存在倾斜情况,如果存在倾斜,则自动生成倾斜优化计划。同时GaussDB(DWS)提供了配置参数skew_option,设置此参数为off之后,优化器会skip倾斜优化计划。

postgres=# SET skew_option TO off;
SET
postgres=# explain analyze SELECT count(1) FROM t1 INNER JOIN t2 ON t1.b = t2.b;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------
id |                 operation                  |       A-time       | A-rows | E-rows | Peak Memory  | E-memory | A-width | E-width | E-costs
----+--------------------------------------------+--------------------+--------+--------+--------------+----------+---------+---------+----------
1 | ->  Aggregate                              | 371.797            |      1 |      1 | 10KB         |          |         |       8 | 14928.22
2 |    ->  Streaming (type: GATHER)            | 371.693            |      4 |      4 | 79KB         |          |         |       8 | 14928.22
3 |       ->  Aggregate                        | [236.825, 350.166] |      4 |      4 | [10KB, 10KB] | 1MB      |         |       8 | 14918.22
4 |          ->  Hash Join (5,7)               | [236.817, 298.880] | 128000 | 128000 | [4KB, 5KB]   | 1MB      |         |       0 | 14838.21
5 |             ->  Streaming(type: BROADCAST) | [82.726, 144.617]  | 512000 | 512000 | [53KB, 53KB] | 2MB      |         |       4 | 13168.21
6 |                ->  Seq Scan on t2          | [15.528, 25.093]   | 128000 | 128000 | [12KB, 12KB] | 1MB      |         |       4 | 469.00
7 |             ->  Hash                       | [35.826, 69.033]   | 128000 | 128000 | [1MB, 1MB]   | 16MB     | [20,20] |       4 | 470.00
8 |                ->  Seq Scan on t1          | [17.163, 35.748]   | 128000 | 128000 | [12KB, 12KB] | 1MB      |         |       4 | 470.00
Predicate Information (identified by plan id)
---------------------------------------------
4 --Hash Join (5,7)
Hash Cond: (t2.b = t1.b)
Memory Information (identified by plan id)
--------------------------------------------------------------------
Coordinator Query Peak Memory:
Query Peak Memory: 1MB
Datanode:
Max Query Peak Memory: 4MB
Min Query Peak Memory: 4MB
7 --Hash
Max Buckets: 32768  Max Batches: 1  Max Memory Usage: 1130kB
Min Buckets: 32768  Min Batches: 1  Min Memory Usage: 1120kB
User Define Profiling
----------------------------------------------------------------
Segment Id: 3  Track name: Datanode build connection
(actual time=[0.759, 15.526], calls=[1, 1])
Segment Id: 3  Track name: Datanode wait connection
(actual time=[0.001, 0.638], calls=[1, 1])
Plan Node id: 2  Track name: coordinator get datanode connection
(actual time=[0.032, 0.032], calls=[1, 1])
====== Query Summary =====
-------------------------------------------------------------------------------
Datanode executor start time [dn_6003_6004, dn_6001_6002]: [2.036 ms,16.202 ms]
Datanode executor end time [dn_6007_6008, dn_6001_6002]: [0.606 ms,1.310 ms]
System available mem: 3112960KB
Query Max mem: 3112960KB
Query estimated mem: 6690KB
Coordinator executor start time: 0.529 ms
Coordinator executor run time: 372.044 ms
Coordinator executor end time: 0.436 ms
Planner runtime: 1.156 ms
Query Id: 72339069014639413
Total runtime: 373.079 ms
(49 rows)

正常机制下,t1和t2表都会按照b列做数据重分布,重分布之后在各个DN上进行JOIN操作。但是表t2的数据比较特殊,t2的字段b上值为1比例高达100%,重分布之后值1所在DN上的计算量会比承载所有的计算量,这种就是非常极端的计算倾斜的场景,我们一般称之为计算倾斜。计算倾斜一般会导致两个问题

1)重负载DN上资源开销比较大,导致集群负载不均衡

2)重负载DN上计算耗时较长,短板效应导致SQL执行时间变长

针对这种场景GaussDB(DWS)进行了特殊的倾斜执行优化

2.2 倾斜优化执行机制

postgres=# RESET skew_option;
RESET
postgres=# explain analyze SELECT count(1) FROM t1 INNER JOIN t2 ON t1.b = t2.b;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
id |                              operation                               |       A-time       | A-rows | E-rows | Peak Memory  | E-memory | A-width | E-width | E-costs
----+----------------------------------------------------------------------+--------------------+--------+--------+--------------+----------+---------+---------+---------
1 | ->  Aggregate                                                        | 182.837            |      1 |      1 | 10KB         |          |         |       8 | 7309.01
2 |    ->  Streaming (type: GATHER)                                      | 182.748            |      4 |      4 | 79KB         |          |         |       8 | 7309.01
3 |       ->  Aggregate                                                  | [158.711, 169.947] |      4 |      4 | [10KB, 10KB] | 1MB      |         |       8 | 7299.01
4 |          ->  Hash Join (5,7)                                         | [147.187, 156.273] | 128000 | 128000 | [5KB, 5KB]   | 1MB      |         |       0 | 7219.00
5 |             ->  Streaming(type: PART REDISTRIBUTE PART ROUNDROBIN)   | [16.308, 23.164]   | 128000 | 128000 | [53KB, 53KB] | 2MB      |         |       4 | 3189.00
6 |                ->  Seq Scan on t2                                    | [14.062, 17.578]   | 128000 | 128000 | [12KB, 12KB] | 1MB      |         |       4 | 469.00
7 |             ->  Hash                                                 | [87.374, 92.295]   | 128003 | 128000 | [1MB, 1MB]   | 16MB     | [20,20] |       4 | 3190.00
8 |                ->  Streaming(type: PART REDISTRIBUTE PART BROADCAST) | [51.194, 56.189]   | 128003 | 128000 | [61KB, 61KB] | 2MB      |         |       4 | 3190.00
9 |                   ->  Seq Scan on t1                                 | [13.989, 15.170]   | 128000 | 128000 | [12KB, 12KB] | 1MB      |         |       4 | 470.00
Predicate Information (identified by plan id)
--------------------------------------------------------
4 --Hash Join (5,7)
Hash Cond: (t2.b = t1.b)
Skew Join Optimized by Statistic
5 --Streaming(type: PART REDISTRIBUTE PART ROUNDROBIN)
Skew Filter: (b = 1)
8 --Streaming(type: PART REDISTRIBUTE PART BROADCAST)
Skew Filter: (b = 1)
Memory Information (identified by plan id)
--------------------------------------------------------------------
Coordinator Query Peak Memory:
Query Peak Memory: 1MB
Datanode:
Max Query Peak Memory: 6MB
Min Query Peak Memory: 6MB
7 --Hash
Max Buckets: 32768  Max Batches: 1  Max Memory Usage: 1131kB
Min Buckets: 32768  Min Batches: 1  Min Memory Usage: 1120kB
User Define Profiling
----------------------------------------------------------------
Segment Id: 3  Track name: Datanode build connection
(actual time=[2.924, 7.101], calls=[1, 1])
Segment Id: 3  Track name: Datanode wait connection
(actual time=[0.000, 1.478], calls=[1, 1])
Plan Node id: 2  Track name: coordinator get datanode connection
(actual time=[0.025, 0.025], calls=[1, 1])
====== Query Summary =====
------------------------------------------------------------------------------
Datanode executor start time [dn_6005_6006, dn_6003_6004]: [3.544 ms,8.513 ms]
Datanode executor end time [dn_6005_6006, dn_6001_6002]: [0.450 ms,0.576 ms]
System available mem: 3112960KB
Query Max mem: 3112960KB
Query estimated mem: 8802KB
Coordinator executor start time: 0.665 ms
Coordinator executor run time: 183.161 ms
Coordinator executor end time: 0.607 ms
Planner runtime: 1.517 ms
Query Id: 72339069014639134
Total runtime: 184.519 ms

针对计算倾斜场景,在数据重分布的时候对倾斜值进行特殊优化。此查询中

  1. JOIN的外表t2的字段b在值1上有倾斜
  2. 内外表除了1之外的值按照正常机制进行分布和计算
  3.  对于外表(t2)上重分布字段b为1的值,进行roundrobin分布(均匀打散到各个DN上)
  4. 对于内表(t1)上重分布字段b为1的值,进行broadcast分布(每个DN都保留一份)

注:除非t1表是复制分布,否则PART ROUNDROBIN和PART BROADCAST总是配对出现

通过这种分布机制,可以把外表重分布字段b为1的值的计算压力均摊到各个DN上,从而避免计算倾斜问题。从上述也可以看出比较明显的性能优化效果(373.079 ms-> 184.519 ms )

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/317257.html

(0)
上一篇 13小时前
下一篇 13小时前

相关推荐

发表回复

登录后才能评论