本文介绍云数据库 SelectDB 版提供的Colocation Join的原理、实现、使用方式和注意事项,作为您选择Join方式进行查询优化的参考。
概述
Colocation Join为某些Join查询提供本地性优化,来减少数据在节点间的传输耗时,加速查询。最初的设计、实现和效果详情请参见ISSUE 245。Colocation Join功能经过一次改版,设计和使用方式和最初设计稍有不同。
标注为可使用Colocation Join的表的这个属性不会被CCR同步,如果这个表是被CCR复制而来的,即PROPERTIES中包含is_being_synced = true
时,这个属性将会在这个表中被擦除。
名词解释
Colocation Group(CG):一个CG中会包含一张及以上的Table。在同一个Group内的Table有着相同的Colocation Group Schema,并且有着相同的数据分片分布。
Colocation Group Schema(CGS):用于描述一个CG中的Table,和Colocation相关的通用Schema信息。包括分桶列类型、分桶数等。
基本原理
Colocation Join功能,是将一组拥有相同CGS的Table组成一个CG,并保证这些Table对应的数据分片会落在同一个BE节点上,使得当CG内的表进行分桶列上的Join操作时,可以通过直接进行本地数据Join,减少数据在节点间的传输耗时。
一个表的数据,最终会根据分桶列值做哈希操作、对桶数取模后落在某一个分桶内。假设一个Table的分桶数为8,则共有[0, 1, 2, 3, 4, 5, 6, 7]
8个分桶(Bucket),我们称这样一个序列为一个BucketsSequence。每个Bucket内会有一个或多个数据分片(Tablet)。当表为单分区表时,一个Bucket内仅有一个Tablet。如果是多分区表,则会有多个。
为了使得Table能够有相同的数据分布,同一CG内的Table必须保证分桶列和分桶数相同:分桶列,即在建表语句中DISTRIBUTED BY HASH(col1, col2, ...)
中指定的列。分桶列决定了一张表的数据通过哪些列的值进行Hash划分到不同的Tablet中。同一CG内的Table必须保证分桶列的类型和数量完全一致,并且桶数一致,才能保证多张表的数据分片能够一一对应的进行分布控制。
同一个CG内的表,分区的个数、范围以及分区列的类型不要求一致。
在固定了分桶列和分桶数后,同一个CG内的表会拥有相同的BucketsSequence。假设BucketsSequence为[0, 1, 2, 3, 4, 5, 6, 7]
,BE节点有[A, B, C, D]
4个。则一个可能的数据分布如下:
+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+
| 0 | | 1 | | 2 | | 3 | | 4 | | 5 | | 6 | | 7 |
+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+
| A | | B | | C | | D | | A | | B | | C | | D |
+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+
CG内所有表的数据都会按照上面的规则进行统一分布,这样可以保证分桶列值相同的数据都在同一个BE节点上,可以进行本地数据Join。
使用方式
创建表
建表时,可以在PROPERTIES
中指定属性"colocate_with" = "group_name"
,表示这个表是一个Colocation Join表,并且归属于一个指定的Colocation Group。
示例如下。
CREATE TABLE tbl (k1 int, v1 int sum)
DISTRIBUTED BY HASH(k1)
BUCKETS 8
PROPERTIES(
"colocate_with" = "group1"
);
如果指定的Group不存在,则SelectDB会自动创建一个只包含当前这张表的Group。如果Group已存在,则SelectDB会检查当前表是否满足Colocation Group Schema。如果满足则会创建该表,并将该表加入Group。同时,表会根据已存在的Group中的数据分布规则创建分片和副本。Group归属于一个Database,Group的名字在一个Database内唯一。在内部,Group的全名被存储为dbId_groupName
,但用户只感知groupName。
SelectDB支持跨Database的Group。在建表时,需使用关键词__global__
作为Group名称的前缀,示例如下。
CREATE TABLE tbl (k1 int, v1 int sum)
DISTRIBUTED BY HASH(k1)
BUCKETS 8
PROPERTIES(
"colocate_with" = "__global__group1"
);
__global__
前缀的Group不再归属于一个Database,其名称也是全局唯一的。通过创建Global Group,可以实现跨Database的Colocate Join。
删除表
当Group中最后一张表彻底删除后(彻底删除是指从回收站中删除。通常,一张表通过DROP TABLE
命令删除后,会在回收站默认停留一天的时间后,再删除),该Group也会被自动删除。
查询表
对Colocation表的查询方式和普通表一样,您无需感知Colocation属性,系统自动规划采用Colocation Join方式。以下举例说明。
创建表。
创建表tbl1,示例如下。
CREATE TABLE `tbl1` ( `k1` date NOT NULL COMMENT "", `k2` int(11) NOT NULL COMMENT "", `v1` int(11) SUM NOT NULL COMMENT "" ) ENGINE=OLAP AGGREGATE KEY(`k1`, `k2`) PARTITION BY RANGE(`k1`) ( PARTITION p1 VALUES LESS THAN ('2019-05-31'), PARTITION p2 VALUES LESS THAN ('2019-06-30') ) DISTRIBUTED BY HASH(`k2`) BUCKETS 8 PROPERTIES ( "colocate_with" = "group1" );
创建表tbl2,示例如下。
CREATE TABLE `tbl2` ( `k1` datetime NOT NULL COMMENT "", `k2` int(11) NOT NULL COMMENT "", `v1` double SUM NOT NULL COMMENT "" ) ENGINE=OLAP AGGREGATE KEY(`k1`, `k2`) DISTRIBUTED BY HASH(`k2`) BUCKETS 8 PROPERTIES ( "colocate_with" = "group1" );
查看查询计划,示例如下。
DESC SELECT * FROM tbl1 INNER JOIN tbl2 ON (tbl1.k2 = tbl2.k2); +----------------------------------------------------+ | Explain String | +----------------------------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:`tbl1`.`k1` | | | PARTITION: RANDOM | | | | RESULT SINK | | | | 2:HASH JOIN | | | join op: INNER JOIN | | | hash predicates: | | | colocate: true | | | `tbl1`.`k2` = `tbl2`.`k2` | | | tuple ids: 0 1 | | | | | |----1:OlapScanNode | | | TABLE: tbl2 | | | PREAGGREGATION: OFF. Reason: null | | | partitions=0/1 | | | rollup: null | | | buckets=0/0 | | | cardinality=-1 | | | avgRowSize=0.0 | | | numNodes=0 | | | tuple ids: 1 | | | | | 0:OlapScanNode | | TABLE: tbl1 | | PREAGGREGATION: OFF. Reason: No AggregateInfo | | partitions=0/2 | | rollup: null | | buckets=0/0 | | cardinality=-1 | | avgRowSize=0.0 | | numNodes=0 | | tuple ids: 0 | +----------------------------------------------------+
Colocation Join生效,则Hash Join节点会显示
colocate: true
。如果没有生效,则查询计划如下。
+----------------------------------------------------+ | Explain String | +----------------------------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:`tbl1`.`k1` | | | PARTITION: RANDOM | | | | RESULT SINK | | | | 2:HASH JOIN | | | join op: INNER JOIN (BROADCAST) | | | hash predicates: | | | colocate: false, reason: group is not stable | | | `tbl1`.`k2` = `tbl2`.`k2` | | | tuple ids: 0 1 | | | | | |----3:EXCHANGE | | | tuple ids: 1 | | | | | 0:OlapScanNode | | TABLE: tbl1 | | PREAGGREGATION: OFF. Reason: No AggregateInfo | | partitions=0/2 | | rollup: null | | buckets=0/0 | | cardinality=-1 | | avgRowSize=0.0 | | numNodes=0 | | tuple ids: 0 | | | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 03 | | UNPARTITIONED | | | | 1:OlapScanNode | | TABLE: tbl2 | | PREAGGREGATION: OFF. Reason: null | | partitions=0/1 | | rollup: null | | buckets=0/0 | | cardinality=-1 | | avgRowSize=0.0 | | numNodes=0 | | tuple ids: 1 | +----------------------------------------------------+
Hash Join节点会显示对应原因:
colocate: false, reason: group is not stable
。同时会有一个 EXCHANGE节点生成。
查看Colocate Group
查看集群内已存在的Group信息,示例如下。
SHOW PROC '/colocation_group';
+-------------+--------------+--------------+------------+----------------+----------+----------+
| GroupId | GroupName | TableIds | BucketsNum | ReplicationNum | DistCols | IsStable |
+-------------+--------------+--------------+------------+----------------+----------+----------+
| 10005.10008 | 10005_group1 | 10007, 10040 | 10 | 3 | int(11) | true |
+-------------+--------------+--------------+------------+----------------+----------+----------+
返回参数说明如下。
参数名称 | 说明 |
GroupId | 一个Group的全集群唯一标识,前半部分为DB ID,后半部分为GROUP ID。 |
GroupName | Group的全名。 |
TabletIds | 该Group包含的Table的ID列表。 |
BucketsNum | 分桶数。 |
ReplicationNum | 副本数。 |
DistCols | Distribution columns,即分桶列类型。 |
IsStable | 该Group是否稳定(稳定的定义,见 |
查看一个Group的数据分布情况,示例如下。
SHOW PROC '/colocation_group/10005.10008';
+-------------+---------------------+
| BucketIndex | BackendIds |
+-------------+---------------------+
| 0 | 10004 |
| 1 | 10003 |
| 2 | 10002 |
| 3 | 10003 |
| 4 | 10002 |
| 5 | 10003 |
| 6 | 10003 |
| 7 | 10003 |
+-------------+---------------------+
参数名称 | 说明 |
BucketIndex | 分桶序列的下标。 |
BackendIds | 分桶中数据分片所在的BE节点ID列表。 |
以上命令需要ADMIN权限,暂不支持普通用户查看。
修改表Colocate Group属性
对一个已经创建的表,修改其Colocation Group属性,示例如下。
ALTER TABLE tbl SET ("colocate_with" = "group2");
该表之前没有指定过Group,则该命令检查Schema,并将该表加入到该Group(Group不存在则会创建)。
该表之前有指定其他Group,则该命令会先将该表从原有Group中移除,并加入新Group(Group不存在则会创建)。
也可以通过以下命令,删除一个表的Colocation属性,示例如下。
ALTER TABLE tbl SET ("colocate_with" = "");
其他相关操作
当对一个具有Colocation属性的表进行增加分区(ADD PARTITION)、修改副本数时,SelectDB会检查修改是否会违反Colocation Group Schema,如果违反则会拒绝。
使用进阶
FE配置项
disable_colocate_relocate
是否关闭SelectDB的自动Colocation副本修复。默认为false,即不关闭。该参数只影响Colocation表的副本修复,不影响普通表。
disable_colocate_balance
是否关闭SelectDB的自动Colocation副本均衡。默认为false,即不关闭。该参数只影响Colocation表的副本均衡,不影响普通表。
以上参数可以动态修改,设置方式请参阅HELP ADMIN SHOW CONFIG;
和HELP ADMIN SET CONFIG;
。
disable_colocate_join
是否关闭 Colocation Join 功能。默认为false,即开启。
use_new_tablet_scheduler
是否启用新的副本调度逻辑,默认为true,即开启。
HTTP Restful API
SelectDB提供了几个和Colocation Join有关的HTTP Restful API,用于查看和修改Colocation Group。
该API实现在FE端,使用fe_host:fe_http_port
进行访问。访问用户需要ADMIN权限。
查看集群的全部Colocation信息,示例如下。
GET /api/colocate 返回以 Json 格式表示内部 Colocation 信息。 { "msg": "success", "code": 0, "data": { "infos": [ ["10003.12002", "10003_group1", "10037, 10043", "1", "1", "int(11)", "true"] ], "unstableGroupIds": [], "allGroupIds": [{ "dbId": 10003, "grpId": 12002 }] }, "count": 0 }
将Group标记为Stable或Unstable,示例如下。
标记为Stable。
DELETE /api/colocate/group_stable?db_id=10005&group_id=10008 返回:200
标记为Unstable。
POST /api/colocate/group_stable?db_id=10005&group_id=10008 返回:200
设置Group的数据分布,示例如下。
该接口可以强制设置某一Group的数据分布。返回结果中Body是以嵌套数组表示的BucketsSequence以及每个Bucket中分片分布所在BE的ID。
POST /api/colocate/bucketseq?db_id=10005&group_id=10008 Body: [[10004],[10003],[10002],[10003],[10002],[10003],[10003],[10003],[10003],[10002]] 返回 200
说明使用该命令,需要将FE的配置
disable_colocate_relocate
和disable_colocate_balance
设为true,即关闭系统自动的Colocation副本修复和均衡。否则在手动修改后,被系统自动重置。
Colocation副本均衡和修复
Colocation表的副本分布需要遵循Group中指定的分布,所以在副本修复和均衡方面和普通分片有所区别。
Group自身有一个Stable属性,当Stable属性为true时,表示当前Group内的表的所有分片都没有正在进行变动,Colocation特性可以正常使用。当Stable为 false 时(Unstable),表示当前Group内有部分表的分片正在进行修复或迁移,此时,相关表的Colocation Join将退化为普通Join。
副本修复
副本只能存储在指定的BE节点上,所以当某个BE不可用时(宕机、Decommission 等),需要寻找一个新的BE进行替换。在这种场景下,SelectDB会优先寻找负载最低的BE进行替换。替换后,该Bucket内的所有在旧BE上的数据分片都要做修复。迁移过程中,Group被标记为Unstable。
副本均衡
SelectDB会将Colocation表的分片均匀分布在所有BE节点上。普通表的副本均衡是以单副本为粒度的,即单独为每一个副本寻找负载较低的BE节点。而Colocation表的均衡是Bucket级别的,即一个Bucket内的所有副本都会一起迁移。
SelectDB采用一个简单的均衡算法,即在不考虑副本实际大小,而只根据副本数量,将BucketsSequence均匀的分布在所有BE上。
当前的Colocation副本均衡和修复算法,对于异构部署的SelectDB实例效果可能不佳。所谓异构部署,即BE节点的磁盘容量、数量、磁盘类型(SSD和HDD)不一致。在异构部署情况下,可能出现小容量的BE节点和大容量的BE节点存储了相同的副本数量。
当一个Group处于Unstable状态时,其中的表的Join将退化为普通Join。此时可能会极大降低集群的查询性能。如果不希望系统自动均衡,可以设置 FE 的配置项
disable_colocate_balance
来禁止自动均衡。如果需要该功能手动打开即可。