通过导入实现删除

为了减轻频繁DELETE操作对于云数据库 SelectDB 版实例查询性能的影响,云数据库 SelectDB 版提供了通过导入方式进行删除数据的解决方法,帮助您优化数据查询性能。

概述

使用DELETE命令删除时,每执行一次DELETE都会生成一个新的数据版本,如果频繁地进行删除操作,会严重影响查询性能。并且使用DELETE方式删除,是通过生成一个空的Rowset来记录删除条件实现,每次读取都要对删除条件进行过滤,同样会在条件较多时对查询性能造成影响。

例如使用CDC进行数据导入,数据导入时INSERT和DELETE一般是穿插出现的,因此普通的导入方式无法满足。即使通过分离出INSERT和DELETE来解决导入的问题,但是仍然解决不了删除的问题。

通过导入方式实现数据删除功能,可以满足此类需求。目前SelectDB提供的Stream Load,OSS Load,Routine Load等导入方式,均支持通过导入来删除数据。导入过程进行数据写入还是数据删除,由导入任务属性merge_type决定,merge_type有以下三种。

  • APPEND:导入数据追加到表中,和普通导入效果一致。

  • DELETE:导入数据用于删除表中数据,删除表中所有与导入数据Key列相同的行(当表存在Sequence列时,需要同时满足主键相同以及Sequence列的大小逻辑才能正确删除)。

  • MERGE:配合DELETE ON表达式使用,根据表达式的结果,决定执行APPEND还是DELETE操作。

基本原理

SelectDB通过增加一个隐藏列__DORIS_DELETE_SIGN__实现导入方式删除。由于导入方式删除是在Unique模型的基础上实现,因此只需要增加一个类型为Boolean,聚合函数为Replace的隐藏列即可。

导入

导入阶段,SelectDB对隐藏列的处理都和正常列一样,并会将隐藏列的值设置成相应的Boolean值。merge_type为MERGE类型时,隐藏列取值为DELETE ON表达式的结果。其他的聚合行为和Replace的聚合列相同。

读取

读取阶段,会采用如下处理方式:在FE遇到*等扩展时去掉__DORIS_DELETE_SIGN__,并且默认加上__DORIS_DELETE_SIGN__ != true的条件,BE读取时都会加上一列进行判断,通过条件确定是否删除。

说明
  • Cumulative Compaction时会将隐藏列看作正常的列处理,Compaction逻辑没有变化。

  • Base Compaction时会将标记为删除的行的删掉,以减少数据占用的空间。

开启导入方式删除

开启导入方式删除有以下两种形式:

  • 通过在FE配置文件中增加enable_batch_delete_by_default=true以支持。在重启FE后新建的表都支持导入方式删除,此选项默认为true。

  • 对于没有更改上述FE配置的实例,或对于已存在的不支持导入方式删除功能的表,可以使用如下语句:ALTER TABLE tablename ENABLE FEATURE "BATCH_DELETE"来启用导入方式删除。该操作本质上是一个Schema变更操作,操作立即返回,可以通过show alter table column来确认操作是否完成。

查看导入方式删除

  • 表是否支持导入方式删除,可以通过设置会话变量SET show_hidden_columns=true来显示隐藏列,然后执行desc tablename,如果其中有__DORIS_DELETE_SIGN__列则支持,否则不支持。

使用方式

通过导入方式进行删除的操作在语法设计方面,主要是指定一个删除标记列,并且需要在导入的数据中增加该列的值用于判断处理。以下分别介绍三种不同导入方式的设置语法。

Stream Load

Stream Load的场景下,可以在Header中的columns字段增加一个代表删除标记列的字段,并通过Header中delete字段指定DELETE ON表达式,示例如下。

curl --location-trusted -u admin:admin_123 -H "expect:100-continue" -H "columns: k1,k2,delete_sign" -H "merge_type: [MERGE|APPEND|DELETE]" -H "delete: delete_sign=1" -T data.csv http://host:port/api/test_db/test_table/_stream_load

OSS Load

OSS Load的场景下,可以在COLUMNS字段中增加删除标记列,之后指定DELETE ON表达式,示例如下。

LOAD LABEL test_db.test_label_1
(
    [MERGE|APPEND|DELETE] DATA INFILE("s3://your_bucket_name/your_file.txt")
    INTO TABLE test_table
    COLUMNS TERMINATED BY ","
    (tmp_c1,tmp_c2, delete_sign)
    SET
    (
        id=tmp_c2,
        name=tmp_c1,
    )
    [DELETE ON delete_sign=true]
)
WITH S3
(
    "AWS_PROVIDER" = "OSS",
    "AWS_REGION" = "oss-cn-beijing",  
    "AWS_ENDPOINT" = "oss-cn-beijing-internal.aliyuncs.com",
    "AWS_ACCESS_KEY" = "<your_access_key>",
    "AWS_SECRET_KEY"="<your_secret_key>"
)
PROPERTIES
(
    "timeout" = "3600"
);

Routine Load

Routine Load的场景下,可以在COLUMNS字段中增加删除标记列,之后指定DELETE ON表达式,示例如下。

CREATE ROUTINE LOAD test_db.test1 ON test_table 
 [WITH MERGE|APPEND|DELETE]
 COLUMNS(k1, k2, k3, v1, v2, delete_sign),
 WHERE k1 > 100 and k2 like "%doris%"
 [DELETE ON delete_sign=true]
 PROPERTIES
 (
     "desired_concurrent_number"="3",
     "max_batch_interval" = "20",
     "max_batch_rows" = "300000",
     "max_batch_size" = "209715200",
     "strict_mode" = "false"
 )
 FROM KAFKA
 (
     "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
     "kafka_topic" = "my_topic",
     "kafka_partitions" = "0,1,2,3",
     "kafka_offsets" = "101,0,0,200"
 );
重要
  • 由于除Stream Load外的导入操作在SelectDB内部有可能乱序执行,因此在使用MERGE方式导入时,如果不是Stream Load场景,那么需要与Load Sequence一起使用,详细说明请参见Sequence列控制更新

  • DELETE ON条件只能与MERGE一起使用。

  • 如果在执行导入作业前,按上文所述开启了SET show_hidden_columns = true的会话变量来查看表是否支持导入方式删除,并按示例完成DELETE/MERGE的导入作业后,在同一个session中执行select count(*) from xxx等语句时,需要执行SET show_hidden_columns = false或者开启新的Session,避免查询结果中包含那些被导入方式删除的记录,导致结果产生错误。

使用示例

如下以Stream Load导入数据为例。

不存在Sequence列

  1. 创建待导入的SelectDB数据表test_table如下。

    CREATE TABLE `test_table` (
        `siteid` varchar(100) NULL,
        `citycode` varchar(100) NULL,
        `username` varchar(100) NULL,
        `pv` int NULL,
    ) ENGINE=OLAP
    UNIQUE KEY(`siteid`,`citycode`,`username`)
    COMMENT 'OLAP'
    DISTRIBUTED BY HASH(`siteid`) BUCKETS 16;
  2. 查看是否启用导入方式删除。

    SET show_hidden_columns=true;
    Query OK, 0 rows affected (0.00 sec)
    
    DESC test_table;
    +-----------------------+--------------+------+-------+---------+---------+
    | Field                 | Type         | Null | Key   | Default | Extra   |
    +-----------------------+--------------+------+-------+---------+---------+
    | siteid                | VARCHAR(100) | No   | true  | NULL    |         |
    | citycode              | VARCHAR(100) | No   | true  | NULL    |         |
    | username              | VARCHAR(100) | No   | true  | NULL    |         |
    | pv                    | INT          | Yes  | false | NULL    | REPLACE |
    | __DORIS_DELETE_SIGN__ | TINYINT      | No   | false | 0       | REPLACE |
    +-----------------------+--------------+------+-------+---------+---------+
    4 rows in set (0.00 sec)
  3. 向表test_table插入数据如下。

    +--------+----------+----------+------+
    | siteid | citycode | username | pv   |
    +--------+----------+----------+------+
    |      3 |        2 | tom      |    2 |
    |      4 |        3 | bush     |    3 |
    |      5 |        3 | helen    |    3 |
    +--------+----------+----------+------+
  4. 对于测试表test_table,导入数据。

    curl --location-trusted -u root: -H "expect:100-continue" -H "column_separator:," -H "columns: siteid, citycode, username, pv" -H "merge_type: APPEND" -T ~/data.csv http://127.0.0.1:8130/api/test_db/test_table/_stream_load

    其中的APPEND条件可以省略,与普通的Stream Load语句效果相同。

    curl --location-trusted -u root: -H "expect:100-continue" -H "column_separator:," -H "columns: siteid, citycode, username, pv" -T ~/data.csv http://127.0.0.1:8130/api/test_db/test_table/_stream_load
  5. 将导入数据Key相同的数据全部删除。

    curl --location-trusted -u root: -H "expect:100-continue" -H "column_separator:," -H "columns: siteid, citycode, username, pv" -H "merge_type: DELETE" -T ~/data.csv http://127.0.0.1:8130/api/test_db/test_table/_stream_load
  6. 新导入数据如下。

    3,2,tom,0
  7. 导入后,查询表的数据如下。

    +--------+----------+----------+------+
    | siteid | citycode | username | pv   |
    +--------+----------+----------+------+
    |      4 |        3 | bush     |    3 |
    |      5 |        3 | helen    |    3 |
    +--------+----------+----------+------+
  8. 由于merge_type方式为MERGE,导入数据中site_id=1的行用于DELETE,其他行用于APPEND。

    curl --location-trusted -u root: -H "expect:100-continue" -H "column_separator:," -H "columns: siteid, citycode, username, pv" -H "merge_type: MERGE" -H "delete: siteid=1" -T ~/data.csv http://127.0.0.1:8130/api/test_db/test_table/_stream_load
  9. 例如导入前表的数据为。

    +--------+----------+----------+------+
    | siteid | citycode | username | pv   |
    +--------+----------+----------+------+
    |      4 |        3 | bush     |    3 |
    |      5 |        3 | helen    |    3 |
    |      1 |        1 | jim      |    2 |
    +--------+----------+----------+------+
  10. 新导入的数据如下。

  11. 2,1,grace,2
    3,2,tom,2
    1,1,jim,2
  12. 导入后,查询表的数据如下。

  13. +--------+----------+----------+------+
    | siteid | citycode | username | pv   |
    +--------+----------+----------+------+
    |      4 |        3 | bush     |    3 |
    |      2 |        1 | grace    |    2 |
    |      3 |        2 | tom      |    2 |
    |      5 |        3 | helen    |    3 |
    +--------+----------+----------+------+

存在Sequence列

  1. 当存在Sequence列时,将与导入数据Key相同的行全部删除。

    curl --location-trusted -u root: -H "expect:100-continue" -H "column_separator:," -H "columns: name, gender, age" -H "function_column.sequence_col: age" -H "merge_type: DELETE" -T ~/data.csv http://127.0.0.1:8130/api/test_db/test_table/_stream_load

    当Unique表设置了Sequence列时,在相同Key列下,Sequence列的值会作为REPLACE聚合函数替换顺序的依据,较大值才会替换较小值。当导入方式删除与Sequence列结合使用时,需要保证Key相同且Sequence列值要大于等于当前值,才会触发删除。

  2. 创建待导入的另一个SelectDB数据表:

    CREATE TABLE `test_table` (
        `name` varchar(100) NULL,
        `gender` varchar(10) NULL,
        `age` INT NULL,
    ) ENGINE=OLAP
    UNIQUE KEY(`name`)
    COMMENT 'OLAP'
    DISTRIBUTED BY HASH(`name`) BUCKETS 16
    PROPERTIES(
        "function_column.sequence_col" = 'age'
    );
  3. 查询表的结构如下。

    SET show_hidden_columns=true;
    Query OK, 0 rows affected (0.00 sec)
    
    DESC test_table;
    +------------------------+--------------+------+-------+---------+---------+
    | Field                  | Type         | Null | Key   | Default | Extra   |
    +------------------------+--------------+------+-------+---------+---------+
    | name                   | VARCHAR(100) | No   | true  | NULL    |         |
    | gender                 | VARCHAR(10)  | Yes  | false | NULL    | REPLACE |
    | age                    | INT          | Yes  | false | NULL    | REPLACE |
    | __DORIS_DELETE_SIGN__  | TINYINT      | No   | false | 0       | REPLACE |
    | __DORIS_SEQUENCE_COL__ | INT          | Yes  | false | NULL    | REPLACE |
    +------------------------+--------------+------+-------+---------+---------+
    4 rows in set (0.00 sec)
  4. 导入前,表的数据为:

  5. +-------+--------+------+
    | name  | gender | age  |
    +-------+--------+------+
    | li    | male   |   10 |
    | wang  | male   |   14 |
    | zhang | male   |   12 |
    +-------+--------+------+
  6. 新导入的数据如下:

  7. li,male,10
  8. 导入后,查询表的数据如下。数据被删除成功。

  9. +-------+--------+------+
    | name  | gender | age  |
    +-------+--------+------+
    | wang  | male   |   14 |
    | zhang | male   |   12 |
    +-------+--------+------+
  10. 如果导入数据如下:

  11. li,male,9
  12. 导入后,查询表的数据如下,相应数据并没有被删除。这是因为在底层的依赖关系上,会先判断Key相同的情况,对外展示Sequence列值大的行数据,然后再看该行的__DORIS_DELETE_SIGN__值是否为1,如果为1则不会对外展示;如果为0,则仍会读出数据。

  13. +-------+--------+------+
    | name  | gender | age  |
    +-------+--------+------+
    | li    | male   |   10 |
    | wang  | male   |   14 |
    | zhang | male   |   12 |
    +-------+--------+------+
说明

当导入数据中同时存在数据写入和删除时(例如Flink CDC场景中),使用Sequence列可以有效的保证数据乱序到达时的一致性,避免后到达的一个旧版本的删除操作,误删掉了先到达的新版本的数据。