Colocation Join

本文介绍云数据库 SelectDB 版提供的Colocation Join的原理、实现、使用方式和注意事项,作为您选择Join方式进行查询优化的参考。

概述

Colocation Join为某些Join查询提供本地性优化,来减少数据在节点间的传输耗时,加速查询。最初的设计、实现和效果详情请参见ISSUE 245。Colocation Join功能经过一次改版,设计和使用方式和最初设计稍有不同。

重要

标注为可使用Colocation Join的表的这个属性不会被CCR同步,如果这个表是被CCR复制而来的,即PROPERTIES中包含is_being_synced = true时,这个属性将会在这个表中被擦除。

名词解释

  • Colocation Group(CG):一个CG中会包含一张及以上的Table。在同一个Group内的Table有着相同的Colocation Group Schema,并且有着相同的数据分片分布。

  • Colocation Group Schema(CGS):用于描述一个CG中的Table,和Colocation相关的通用Schema信息。包括分桶列类型、分桶数等。

基本原理

Colocation Join功能,是将一组拥有相同CGS的Table组成一个CG,并保证这些Table对应的数据分片会落在同一个BE节点上,使得当CG内的表进行分桶列上的Join操作时,可以通过直接进行本地数据Join,减少数据在节点间的传输耗时。

一个表的数据,最终会根据分桶列值做哈希操作、对桶数取模后落在某一个分桶内。假设一个Table的分桶数为8,则共有[0, 1, 2, 3, 4, 5, 6, 7]8个分桶(Bucket),我们称这样一个序列为一个BucketsSequence。每个Bucket内会有一个或多个数据分片(Tablet)。当表为单分区表时,一个Bucket内仅有一个Tablet。如果是多分区表,则会有多个。

为了使得Table能够有相同的数据分布,同一CG内的Table必须保证分桶列和分桶数相同:分桶列,即在建表语句中DISTRIBUTED BY HASH(col1, col2, ...)中指定的列。分桶列决定了一张表的数据通过哪些列的值进行Hash划分到不同的Tablet中。同一CG内的Table必须保证分桶列的类型和数量完全一致,并且桶数一致,才能保证多张表的数据分片能够一一对应的进行分布控制。

同一个CG内的表,分区的个数、范围以及分区列的类型不要求一致。

在固定了分桶列和分桶数后,同一个CG内的表会拥有相同的BucketsSequence。假设BucketsSequence为[0, 1, 2, 3, 4, 5, 6, 7],BE节点有[A, B, C, D]4个。则一个可能的数据分布如下:

+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+
| 0 | | 1 | | 2 | | 3 | | 4 | | 5 | | 6 | | 7 |
+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+
| A | | B | | C | | D | | A | | B | | C | | D |
+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+

CG内所有表的数据都会按照上面的规则进行统一分布,这样可以保证分桶列值相同的数据都在同一个BE节点上,可以进行本地数据Join。

使用方式

创建表

建表时,可以在PROPERTIES中指定属性"colocate_with" = "group_name",表示这个表是一个Colocation Join表,并且归属于一个指定的Colocation Group。

示例如下。

CREATE TABLE tbl (k1 int, v1 int sum)
DISTRIBUTED BY HASH(k1)
BUCKETS 8
PROPERTIES(
    "colocate_with" = "group1"
);

如果指定的Group不存在,则SelectDB会自动创建一个只包含当前这张表的Group。如果Group已存在,则SelectDB会检查当前表是否满足Colocation Group Schema。如果满足则会创建该表,并将该表加入Group。同时,表会根据已存在的Group中的数据分布规则创建分片和副本。Group归属于一个Database,Group的名字在一个Database内唯一。在内部,Group的全名被存储为dbId_groupName,但用户只感知groupName。

SelectDB支持跨Database的Group。在建表时,需使用关键词__global__作为Group名称的前缀,示例如下。

CREATE TABLE tbl (k1 int, v1 int sum)
DISTRIBUTED BY HASH(k1)
BUCKETS 8
PROPERTIES(
    "colocate_with" = "__global__group1"
);

__global__前缀的Group不再归属于一个Database,其名称也是全局唯一的。通过创建Global Group,可以实现跨Database的Colocate Join。

删除表

当Group中最后一张表彻底删除后(彻底删除是指从回收站中删除。通常,一张表通过DROP TABLE命令删除后,会在回收站默认停留一天的时间后,再删除),该Group也会被自动删除。

查询表

对Colocation表的查询方式和普通表一样,您无需感知Colocation属性,系统自动规划采用Colocation Join方式。以下举例说明。

  1. 创建表。

    创建表tbl1,示例如下。

    CREATE TABLE `tbl1` (
        `k1` date NOT NULL COMMENT "",
        `k2` int(11) NOT NULL COMMENT "",
        `v1` int(11) SUM NOT NULL COMMENT ""
    ) ENGINE=OLAP
    AGGREGATE KEY(`k1`, `k2`)
    PARTITION BY RANGE(`k1`)
    (
        PARTITION p1 VALUES LESS THAN ('2019-05-31'),
        PARTITION p2 VALUES LESS THAN ('2019-06-30')
    )
    DISTRIBUTED BY HASH(`k2`) BUCKETS 8
    PROPERTIES (
        "colocate_with" = "group1"
    );

    创建表tbl2,示例如下。

    CREATE TABLE `tbl2` (
        `k1` datetime NOT NULL COMMENT "",
        `k2` int(11) NOT NULL COMMENT "",
        `v1` double SUM NOT NULL COMMENT ""
    ) ENGINE=OLAP
    AGGREGATE KEY(`k1`, `k2`)
    DISTRIBUTED BY HASH(`k2`) BUCKETS 8
    PROPERTIES (
        "colocate_with" = "group1"
    );
  2. 查看查询计划,示例如下。

    DESC SELECT * FROM tbl1 INNER JOIN tbl2 ON (tbl1.k2 = tbl2.k2);
    +----------------------------------------------------+
    | Explain String                                     |
    +----------------------------------------------------+
    | PLAN FRAGMENT 0                                    |
    |  OUTPUT EXPRS:`tbl1`.`k1` |                        |
    |   PARTITION: RANDOM                                |
    |                                                    |
    |   RESULT SINK                                      |
    |                                                    |
    |   2:HASH JOIN                                      |
    |   |  join op: INNER JOIN                           |
    |   |  hash predicates:                              |
    |   |  colocate: true                                |
    |   |    `tbl1`.`k2` = `tbl2`.`k2`                   |
    |   |  tuple ids: 0 1                                |
    |   |                                                |
    |   |----1:OlapScanNode                              |
    |   |       TABLE: tbl2                              |
    |   |       PREAGGREGATION: OFF. Reason: null        |
    |   |       partitions=0/1                           |
    |   |       rollup: null                             |
    |   |       buckets=0/0                              |
    |   |       cardinality=-1                           |
    |   |       avgRowSize=0.0                           |
    |   |       numNodes=0                               |
    |   |       tuple ids: 1                             |
    |   |                                                |
    |   0:OlapScanNode                                   |
    |      TABLE: tbl1                                   |
    |      PREAGGREGATION: OFF. Reason: No AggregateInfo |
    |      partitions=0/2                                |
    |      rollup: null                                  |
    |      buckets=0/0                                   |
    |      cardinality=-1                                |
    |      avgRowSize=0.0                                |
    |      numNodes=0                                    |
    |      tuple ids: 0                                  |
    +----------------------------------------------------+

    Colocation Join生效,则Hash Join节点会显示colocate: true

    如果没有生效,则查询计划如下。

    +----------------------------------------------------+
    | Explain String                                     |
    +----------------------------------------------------+
    | PLAN FRAGMENT 0                                    |
    |  OUTPUT EXPRS:`tbl1`.`k1` |                        |
    |   PARTITION: RANDOM                                |
    |                                                    |
    |   RESULT SINK                                      |
    |                                                    |
    |   2:HASH JOIN                                      |
    |   |  join op: INNER JOIN (BROADCAST)               |
    |   |  hash predicates:                              |
    |   |  colocate: false, reason: group is not stable  |
    |   |    `tbl1`.`k2` = `tbl2`.`k2`                   |
    |   |  tuple ids: 0 1                                |
    |   |                                                |
    |   |----3:EXCHANGE                                  |
    |   |       tuple ids: 1                             |
    |   |                                                |
    |   0:OlapScanNode                                   |
    |      TABLE: tbl1                                   |
    |      PREAGGREGATION: OFF. Reason: No AggregateInfo |
    |      partitions=0/2                                |
    |      rollup: null                                  |
    |      buckets=0/0                                   |
    |      cardinality=-1                                |
    |      avgRowSize=0.0                                |
    |      numNodes=0                                    |
    |      tuple ids: 0                                  |
    |                                                    |
    | PLAN FRAGMENT 1                                    |
    |  OUTPUT EXPRS:                                     |
    |   PARTITION: RANDOM                                |
    |                                                    |
    |   STREAM DATA SINK                                 |
    |     EXCHANGE ID: 03                                |
    |     UNPARTITIONED                                  |
    |                                                    |
    |   1:OlapScanNode                                   |
    |      TABLE: tbl2                                   |
    |      PREAGGREGATION: OFF. Reason: null             |
    |      partitions=0/1                                |
    |      rollup: null                                  |
    |      buckets=0/0                                   |
    |      cardinality=-1                                |
    |      avgRowSize=0.0                                |
    |      numNodes=0                                    |
    |      tuple ids: 1                                  |
    +----------------------------------------------------+

    Hash Join节点会显示对应原因:colocate: false, reason: group is not stable。同时会有一个 EXCHANGE节点生成。

查看Colocate Group

查看集群内已存在的Group信息,示例如下。

SHOW PROC '/colocation_group';

+-------------+--------------+--------------+------------+----------------+----------+----------+
| GroupId     | GroupName    | TableIds     | BucketsNum | ReplicationNum | DistCols | IsStable |
+-------------+--------------+--------------+------------+----------------+----------+----------+
| 10005.10008 | 10005_group1 | 10007, 10040 | 10         | 3              | int(11)  | true     |
+-------------+--------------+--------------+------------+----------------+----------+----------+

返回参数说明如下。

参数名称

说明

GroupId

一个Group的全集群唯一标识,前半部分为DB ID,后半部分为GROUP ID。

GroupName

Group的全名。

TabletIds

该Group包含的Table的ID列表。

BucketsNum

分桶数。

ReplicationNum

副本数。

DistCols

Distribution columns,即分桶列类型。

IsStable

该Group是否稳定(稳定的定义,见Colocation 副本均衡和修复一节)。

查看一个Group的数据分布情况,示例如下。

SHOW PROC '/colocation_group/10005.10008';

+-------------+---------------------+
| BucketIndex | BackendIds          |
+-------------+---------------------+
| 0           | 10004               |
| 1           | 10003               |
| 2           | 10002               |
| 3           | 10003               |
| 4           | 10002               |
| 5           | 10003               |
| 6           | 10003               |
| 7           | 10003               |
+-------------+---------------------+

参数名称

说明

BucketIndex

分桶序列的下标。

BackendIds

分桶中数据分片所在的BE节点ID列表。

说明

以上命令需要ADMIN权限,暂不支持普通用户查看。

修改表Colocate Group属性

对一个已经创建的表,修改其Colocation Group属性,示例如下。

ALTER TABLE tbl SET ("colocate_with" = "group2");
  • 该表之前没有指定过Group,则该命令检查Schema,并将该表加入到该Group(Group不存在则会创建)。

  • 该表之前有指定其他Group,则该命令会先将该表从原有Group中移除,并加入新Group(Group不存在则会创建)。

也可以通过以下命令,删除一个表的Colocation属性,示例如下。

ALTER TABLE tbl SET ("colocate_with" = "");

其他相关操作

当对一个具有Colocation属性的表进行增加分区(ADD PARTITION)、修改副本数时,SelectDB会检查修改是否会违反Colocation Group Schema,如果违反则会拒绝。

使用进阶

FE配置项

  • disable_colocate_relocate

    是否关闭SelectDB的自动Colocation副本修复。默认为false,即不关闭。该参数只影响Colocation表的副本修复,不影响普通表。

  • disable_colocate_balance

    是否关闭SelectDB的自动Colocation副本均衡。默认为false,即不关闭。该参数只影响Colocation表的副本均衡,不影响普通表。

以上参数可以动态修改,设置方式请参阅HELP ADMIN SHOW CONFIG;HELP ADMIN SET CONFIG;

  • disable_colocate_join

    是否关闭 Colocation Join 功能。默认为false,即开启。

  • use_new_tablet_scheduler

    是否启用新的副本调度逻辑,默认为true,即开启。

HTTP Restful API

SelectDB提供了几个和Colocation Join有关的HTTP Restful API,用于查看和修改Colocation Group。

该API实现在FE端,使用fe_host:fe_http_port进行访问。访问用户需要ADMIN权限。

  • 查看集群的全部Colocation信息,示例如下。

    GET /api/colocate
    
    返回以 Json 格式表示内部 Colocation 信息。
    
    {
        "msg": "success",
        "code": 0,
        "data": {
            "infos": [
                ["10003.12002", "10003_group1", "10037, 10043", "1", "1", "int(11)", "true"]
            ],
            "unstableGroupIds": [],
            "allGroupIds": [{
                "dbId": 10003,
                "grpId": 12002
            }]
        },
        "count": 0
    }
  • 将Group标记为Stable或Unstable,示例如下。

    • 标记为Stable。

      DELETE /api/colocate/group_stable?db_id=10005&group_id=10008
      
      返回:200
    • 标记为Unstable。

      POST /api/colocate/group_stable?db_id=10005&group_id=10008
      
      返回:200
  • 设置Group的数据分布,示例如下。

    该接口可以强制设置某一Group的数据分布。返回结果中Body是以嵌套数组表示的BucketsSequence以及每个Bucket中分片分布所在BE的ID。

    POST /api/colocate/bucketseq?db_id=10005&group_id=10008
    
    Body:
    [[10004],[10003],[10002],[10003],[10002],[10003],[10003],[10003],[10003],[10002]]
    
    返回 200
    说明

    使用该命令,需要将FE的配置disable_colocate_relocatedisable_colocate_balance设为true,即关闭系统自动的Colocation副本修复和均衡。否则在手动修改后,被系统自动重置。

Colocation副本均衡和修复

Colocation表的副本分布需要遵循Group中指定的分布,所以在副本修复和均衡方面和普通分片有所区别。

Group自身有一个Stable属性,当Stable属性为true时,表示当前Group内的表的所有分片都没有正在进行变动,Colocation特性可以正常使用。当Stable为 false 时(Unstable),表示当前Group内有部分表的分片正在进行修复或迁移,此时,相关表的Colocation Join将退化为普通Join。

副本修复

副本只能存储在指定的BE节点上,所以当某个BE不可用时(宕机、Decommission 等),需要寻找一个新的BE进行替换。在这种场景下,SelectDB会优先寻找负载最低的BE进行替换。替换后,该Bucket内的所有在旧BE上的数据分片都要做修复。迁移过程中,Group被标记为Unstable。

副本均衡

SelectDB会将Colocation表的分片均匀分布在所有BE节点上。普通表的副本均衡是以单副本为粒度的,即单独为每一个副本寻找负载较低的BE节点。而Colocation表的均衡是Bucket级别的,即一个Bucket内的所有副本都会一起迁移。

SelectDB采用一个简单的均衡算法,即在不考虑副本实际大小,而只根据副本数量,将BucketsSequence均匀的分布在所有BE上。

说明
  • 当前的Colocation副本均衡和修复算法,对于异构部署的SelectDB实例效果可能不佳。所谓异构部署,即BE节点的磁盘容量、数量、磁盘类型(SSD和HDD)不一致。在异构部署情况下,可能出现小容量的BE节点和大容量的BE节点存储了相同的副本数量。

  • 当一个Group处于Unstable状态时,其中的表的Join将退化为普通Join。此时可能会极大降低集群的查询性能。如果不希望系统自动均衡,可以设置 FE 的配置项disable_colocate_balance来禁止自动均衡。如果需要该功能手动打开即可。