当您在Unique模型中导入数据并且需要保证数据正确时,云数据库 SelectDB 版为您提供了通过Sequence列实现条件更新的功能,使您在导入数据时可以控制列的替换顺序,进而保证您的数据正确。
前提条件
表的数据模型为Unique模型。SelectDB数据模型详情请参见数据模型。
概述
Unique模型一般用于唯一主键的场景,可以保证主键唯一性约束,但是由于使用REPLACE聚合方式,在同一批次中导入的数据不保证替换顺序。替换顺序无法保证则无法确定最终导入到表中的具体数据。为了解决这个问题,SelectDB提供了Sequence列,在导入时可指定Sequence列,通过指定Sequence列可以控制替换顺序。相同Key列的情况下,REPLACE聚合类型的列将按照Sequence列的值进行替换,较大值可以替换较小值,反之则无法替换。
基本原理
通过增加一个隐藏列__DORIS_SEQUENCE_COL__
实现。该列的类型由用户在建表时指定,在导入时确定该列具体值,并依据该值对REPLACE列进行替换。
导入
导入时,FE在解析过程中将隐藏列的值设置成order by
表达式的值(针对Broker Load和Routine Load),或者function_column.sequence_col
表达式的值(针对Stream Load),Value列将依据该值进行替换。隐藏列__DORIS_SEQUENCE_COL__
的值既可以是表结构中的已有列,也可以采用数据源中的一列。
读取
请求包含Value列时需要额外读取__DORIS_SEQUENCE_COL__
列,该列是在相同Key列下REPLACE聚合函数替换顺序的依据,较大值可以替换较小值,反之则不能替换。
Base Compaction和Cumulative Compaction也遵循读取的原理。
启用Sequence column支持
建表时启用Sequence column支持
Sequence列建表时有两种方式,一种是建表时设置sequence_col
属性,一种是建表时设置sequence_type
属性。建议在创建表时设置sequence_col
属性。
设置
sequence_col
推荐使用指定Sequence列类型为
sequence_col
。因为导入数据时无需感知Sequence列,和普通导入使用方式一致,使用简单。创建Unique表时,指定Sequence列类型为
sequence_col
,sequence_col
指定Sequence列到表中某一列的映射,该列可以为整型和时间类型(DATE、DATETIME),示例如下。PROPERTIES ( "function_column.sequence_col" = 'column_name');
说明创建后不能更改该列的类型。
设置
sequence_type
创建Unique表时,指定Sequence列类型为
sequence_type
,sequence_type
列可以为整型和时间类型(DATE、DATETIME)。示例如下。PROPERTIES ( "function_column.sequence_type" = 'column_name');
说明导入时需要指定Sequence列到其他列的映射。
建表后启用Sequence column支持
一个不支持Sequence column的表,可以使用如下语句来启用支持Sequence column,示例如下。
ALTER TABLE db_name.table_name
ENABLE FEATURE "SEQUENCE_LOAD"
WITH PROPERTIES ("function_column.sequence_type" = "column_name")
查看Sequence column支持
不确定一个表是否支持Sequence column,可以通过设置一个会话变量来显示隐藏列SET show_hidden_columns=true
,之后使用desc tablename
,如果输出中有__DORIS_SEQUENCE_COL__
列则支持,如果没有则不支持。
使用方式
以下分别介绍三种不同的导入方式设置数据源中的某列为Sequence列。
Stream Load
在Stream Load场景下,可以通过Header中的function_column.sequence_col
字段指定columns中某列作为Sequence列,示例如下。
curl --location-trusted -u root -H "columns: k1,k2,source_sequence,v1,v2" -H "function_column.sequence_col: source_sequence" -T testData http://host:port/api/testDb/testTbl/_stream_load
Broker Load
在ORDER BY
处设置数据源中的某列作为Sequence列,示例如下。
LOAD LABEL db1.label1
(
DATA INFILE("hdfs://host:port/user/data/*/test.txt")
INTO TABLE `tbl1`
COLUMNS TERMINATED BY ","
(k1,k2,source_sequence,v1,v2)
ORDER BY source_sequence
)
WITH BROKER 'broker'
(
"username"="user",
"password"="pass"
)
PROPERTIES
(
"timeout" = "3600"
);
Routine Load
映射方式同上,示例如下。
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
[WITH MERGE|APPEND|DELETE]
COLUMNS(k1, k2, source_sequence, v1, v2),
WHERE k1 > 100 and k2 like "%doris%"
[ORDER BY source_sequence]
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,0,200"
);
使用示例
如下以Stream Load导入数据为例。
创建Unique模型的
test_table
数据表,并指定Sequence列映射到表中的modify_date
列。CREATE TABLE test_db.test_table ( user_id bigint, date date, group_id bigint, modify_date date, keyword VARCHAR(128) ) UNIQUE KEY(user_id, date, group_id) DISTRIBUTED BY HASH (user_id) BUCKETS 32 PROPERTIES( "function_column.sequence_col" = 'modify_date', "enable_unique_key_merge_on_write" = "true" );
表结构如下。
DESC test_table; +-------------+--------------+------+-------+---------+---------+ | Field | Type | Null | Key | Default | Extra | +-------------+--------------+------+-------+---------+---------+ | user_id | BIGINT | No | true | NULL | | | date | DATE | No | true | NULL | | | group_id | BIGINT | No | true | NULL | | | modify_date | DATE | No | false | NULL | REPLACE | | keyword | VARCHAR(128) | No | false | NULL | REPLACE | +-------------+--------------+------+-------+---------+---------+
创建文件,需要导入的文本数据如下。
1,2020-02-22,1,2020-02-21,a 1,2020-02-22,1,2020-02-22,b 1,2020-02-22,1,2020-03-05,c 1,2020-02-22,1,2020-02-26,d 1,2020-02-22,1,2020-02-23,e 1,2020-02-22,1,2020-02-24,b
以Stream Load为例,导入数据,示例如下。
curl --location-trusted -u root: -T data.csv -H "expect:100-continue" -H "column_separator:," http://host:port/api/test_db/test_table/_stream_load
查询导入结果,结果如下。
SELECT * FROM test_table; +---------+------------+----------+-------------+---------+ | user_id | date | group_id | modify_date | keyword | +---------+------------+----------+-------------+---------+ | 1 | 2020-02-22 | 1 | 2020-03-05 | c | +---------+------------+----------+-------------+---------+
因此这次导入中,因Sequence column的值(也就是
modify_date
中的值)里“2020-03-05”为最大值,所以keyword列中最终保留了c。创建文件,然后通过Stream Load将文件中数据导入,示例如下。
1,2020-02-22,1,2020-02-22,a 1,2020-02-22,1,2020-02-23,b
查询导入结果,结果如下。
SELECT * FROM test_table; +---------+------------+----------+-------------+---------+ | user_id | date | group_id | modify_date | keyword | +---------+------------+----------+-------------+---------+ | 1 | 2020-02-22 | 1 | 2020-03-05 | c | +---------+------------+----------+-------------+---------+
查询这次导入的数据,会比较所有已导入数据的Sequence列(也就是
modify_date
),其中“2020-03-05”仍为最大值,所以keyword列中最终保留了c。创建文件,然后通过Stream Load将文件中数据导入,示例如下。
1,2020-02-22,1,2020-02-22,a 1,2020-02-22,1,2020-03-23,w
查询导入结果,结果如下。
SELECT * FROM test_table; +---------+------------+----------+-------------+---------+ | user_id | date | group_id | modify_date | keyword | +---------+------------+----------+-------------+---------+ | 1 | 2020-02-22 | 1 | 2020-03-23 | w | +---------+------------+----------+-------------+---------+
此时Sequence列的最大值为“2020-03-23”,因此替换表中原有的数据。
综上,在导入过程中,会比较所有批次的Sequence列值,选择值最大的记录导入SelectDB表中。
在Stream Load或Broker Load等导入任务以及行更新Insert语句中,用户必须显式指定Sequence列(除非Sequence列的默认值为CURRENT_TIMESTAMP),否则会产生报错信息“Table test_table has sequence column, need to specify the sequence column”。
在部分列更新导入中,用户每次可以只更新一部分列,并不必须要包含Sequence列。若用户提交的导入任务中包含Sequence列,则无影响;若用户提交的导入任务不包含Sequence列,SelectDB会使用匹配的历史数据中的Sequence列作为更新后该行的Sequence列的值。如果历史数据中不存在相同Key的列,则会自动用NULL或默认值填充。