本文介绍如何使用分组导入(Group Commit)模式在高并发小写入的场景下来提升导入性能。
概述
Group Commit不是一种新的导入方式,而是对INSERT INTO tbl VALUES(...)
、Stream Load
的扩展,大幅提升了高并发小写入的性能。您的应用程序可以直接使用JDBC将数据高频写入云数据库 SelectDB 版,同时通过使用PreparedStatement可以获得更高的性能。在日志场景下,您也可以利用Stream Load或者Http Stream将数据高频写入云数据库 SelectDB 版。Group Commit写入有如下三种模式。
关闭模式(off_mode)
不开启Group Commit,保持以上三种导入方式的默认行为。
同步模式(sync_mode)
SelectDB会根据负载和表的
group_commit_interval
属性将多个导入在一个事务提交,事务提交后返回导入结果。这适用于高并发写入场景,且在导入完成后要求数据立即可见。异步模式(async_mode)
异步模式适用于写入延迟敏感以及高频写入的场景。SelectDB首先将数据写入WAL(Write Ahead Log),然后立即返回导入结果。SelectDB会根据负载和表的
group_commit_interval
属性异步提交数据,提交之后数据可见。为了防止WAL占用较大的磁盘空间,单次导入数据量较大时,会自动切换为sync_mode
。
使用限制
当开启了Group Commit模式,系统会判断您发起的
INSERT INTO VALUES
语句是否符合Group Commit的条件,如果符合,该语句的执行会进入到Group Commit写入中。但符合如下条件的语句会自动退化为非Group Commit方式。事务写入:即
Begin
;INSERT INTO VALUES
;COMMIT
方式指定Label:即
INSERT INTO dt WITH LABEL {label} VALUES
VALUES中包含表达式:即
INSERT INTO dt VALUES (1 + 100)
列更新写入。
表不支持light schema change。
当开启了Group Commit模式,系统会判断您发起的
Stream Load
和Http Stream
是否符合Group Commit的条件,如果符合,该导入的执行会进入到Group Commit写入中。但符合如下条件的会自动退化为非Group Commit方式。指定Label:即通过
-H "label:my_label"
设置。两阶段提交。
列更新写入。
表不支持light schema change。
对于Unique模型,由于Group Commit不能保证提交顺序,您可以配合Sequence列使用来保证数据一致性。
对
max_filter_ratio
语义的支持。在默认的导入中,
filter_ratio
是导入完成后,通过失败的行数和总行数计算,决定是否提交本次写入。在Group Commit模式下,由于多个客户端发起的导入都会由一个内部导入执行,虽然可以计算出每个导入的
filter_ratio
,但是数据一旦进入内部导入,就只能commit transaction。Group Commit模式支持了一定程度的
max_filter_ratio
语义,当导入的总行数不高于group_commit_memory_rows_for_max_filter_ratio
(BE配置参数,默认为10000
行),max_filter_ratio
正常工作。
WAL限制
对于
async_mode
的Group Commit写入,会把数据写入WAL。如果内部导入成功,则WAL被立刻删除;如果内部导入失败,通过导入WAL的方法来恢复数据。对于
async_mode
的Group Commit写入,为了保护磁盘空间,当遇到如下情况时,会切换成sync_mode
。导入数据量过大,即超过WAL单目录的80%空间。
数据量规模不确定的chunked stream load。
导入数据量不大,但磁盘可用空间不足。
当发生重量级Schema Change(目前加减列、修改varchar长度和重命名列是轻量级Schema Change,其它的是重量级Schema Change)时,为了保证WAL能够适配表的Schema,在Schema Change最后的修改元数据阶段,会拒绝Group Commit写入,客户端收到
insert table ${table_name} is blocked on schema change
异常,请您在客户端重试即可。
使用示例
创建表dt
,示例如下。
CREATE TABLE `dt` (
`id` int(11) NOT NULL,
`name` varchar(50) NULL,
`score` int(11) NULL
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1;
使用JDBC方式
当您使用JDBCINSERT INTO VALUES
方式写入时,为了减少SQL解析和生成规划的开销,云数据库 SelectDB 版支持了MySQL协议的PreparedStatement
特性。当您使用PreparedStatement
时,SQL和其导入规划将被缓存到Session级别的内存缓存中,后续的导入直接使用缓存对象,降低了集群的CPU压力。下面是在JDBC中使用PreparedStatement
的使用示例。
添加依赖。
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.49</version> </dependency>
设置JDBC URL并在Server端开启PreparedStatement。
url = jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030/db?useServerPrepStmts=true
配置
group_commit
session变量,有如下两种方式。通过JDBC URL设置,增加参数
sessionVariables=group_commit=async_mode
。url = jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030/db?useServerPrepStmts=true&sessionVariables=group_commit=async_mode
通过执行SQL设置
try (Statement statement = conn.createStatement()) { statement.execute("SET group_commit = async_mode;"); }
使用
PreparedStatement
。private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver"; private static final String URL_PATTERN = "jdbc:mysql://%s:%d/%s?useServerPrepStmts=true"; private static final String HOST = "selectdb-cn-****.selectdbfe.rds.aliyuncs.com"; private static final int PORT = 9030; private static final String DB = "db"; private static final String TBL = "dt"; private static final String USER = "admin"; private static final String PASSWD = "***"; private static final int INSERT_BATCH_SIZE = 10; public static void main(String[] args) { groupCommitInsert(); //groupCommitInsertBatch } private static void groupCommitInsert() throws Exception { Class.forName(JDBC_DRIVER); try (Connection conn = DriverManager.getConnection(String.format(URL_PATTERN, HOST, PORT, DB), USER, PASSWD)) { // set session variable 'group_commit' try (Statement statement = conn.createStatement()) { statement.execute("SET group_commit = async_mode;"); } String query = "INSERT INTO " + TBL + " VALUES(?, ?, ?)"; try (PreparedStatement stmt = conn.prepareStatement(query)) { for (int i = 0; i < INSERT_BATCH_SIZE; i++) { stmt.setInt(1, i); stmt.setString(2, "name" + i); stmt.setInt(3, i + 10); int result = stmt.executeUpdate(); System.out.println("rows: " + result); } } } catch (Exception e) { e.printStackTrace(); } } private static void groupCommitInsertBatch() throws Exception { Class.forName(JDBC_DRIVER); // add rewriteBatchedStatements=true and cachePrepStmts=true in JDBC url // set session variables by sessionVariables=group_commit=async_mode in JDBC url try (Connection conn = DriverManager.getConnection( String.format(URL_PATTERN + "&rewriteBatchedStatements=true&cachePrepStmts=true&sessionVariables=group_commit=async_mode", HOST, PORT, DB), USER, PASSWD)) { String query = "INSERT INTO " + TBL + " VALUES(?, ?, ?)"; try (PreparedStatement stmt = conn.prepareStatement(query)) { for (int j = 0; j < 5; j++) { // 10 rows per insert for (int i = 0; i < INSERT_BATCH_SIZE; i++) { stmt.setInt(1, i); stmt.setString(2, "name" + i); stmt.setInt(3, i + 10); stmt.addBatch(); } int[] result = stmt.executeBatch(); } } } catch (Exception e) { e.printStackTrace(); } }
使用INSERT INTO方式
以下分别介绍使用INSERT INTO插入数据的异步和同步模式。
异步模式
-- 配置 session 变量开启 group commit (默认为 off_mode),开启异步模式。 mysql> SET group_commit = async_mode; -- 这里返回的 label 是 group_commit 开头的,用于区分是否使用了 group commit模式。 mysql> INSERT INTO dt VALUES(1, 'Bob', 90), (2, 'Alice', 99); Query OK, 2 rows affected (0.05 sec) {'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'} -- 如下 label 和 txn_id 和上述插入的相同,说明是攒到了同一个导入任务中。 mysql> INSERT INTO dt(id, name) VALUES(3, 'John'); Query OK, 1 row affected (0.01 sec) {'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'} -- 不可以立刻查询到导入结果。 mysql> SELECT * FROM dt; Empty SET (0.01 sec) -- 过10 秒后执行查询,可以查询到,可以通过表属性 group_commit_interval 控制数据可见的延迟。 mysql> SELECT * FROM dt; +------+-------+-------+ | id | name | score | +------+-------+-------+ | 1 | Bob | 90 | | 2 | Alice | 99 | | 3 | John | NULL | +------+-------+-------+ 3 rows in set (0.02 sec)
同步模式
-- 配置 session 变量开启 group commit (默认为 off_mode),开启同步模式。 mysql> SET group_commit = sync_mode; -- 导入耗时为表属性group_commit_interval。返回的 label 是 group_commit 开头的,可以区分出是否用了 group commit 模式。 mysql> INSERT INTO dt VALUES(4, 'Bob', 90), (5, 'Alice', 99); Query OK, 2 rows affected (10.06 sec) {'label':'group_commit_d84ab96c09b60587_ec455a33cb0e9e87', 'status':'PREPARE', 'txnId':'3007', 'query_id':'fc6b94085d704a94-a69bfc9a202e66e2'} -- 可以立刻查询到导入结果。 mysql> SELECT * FROM dt; +------+-------+-------+ | id | name | score | +------+-------+-------+ | 1 | Bob | 90 | | 2 | Alice | 99 | | 3 | John | NULL | | 4 | Bob | 90 | | 5 | Alice | 99 | +------+-------+-------+ 5 rows in set (0.03 sec)
关闭group commit模式
mysql> SET group_commit = off_mode;
Stream Load
Stream Load详情请参见Stream Load。
新建data.csv文件,内容如下。
6,Amy,60 7,Ross,98
以下分别介绍如何使用Stream Load开启异步模式或同步模式导入数据。
异步模式
# 导入时在 header 中增加配置"group_commit:async_mode"。 curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:async_mode" -H "column_separator:," http://{selectdbHost}:{selectdbHttpPort}/api/db/dt/_stream_load { "TxnId": 7009, "Label": "group_commit_c84d2099208436ab_96e33fda01eddba8", "Comment": "", "GroupCommit": true, "Status": "Success", "Message": "OK", "NumberTotalRows": 2, "NumberLoadedRows": 2, "NumberFilteredRows": 0, "NumberUnselectedRows": 0, "LoadBytes": 19, "LoadTimeMs": 35, "StreamLoadPutTimeMs": 5, "ReadDataTimeMs": 0, "WriteDataTimeMs": 26 } # 返回的参数 GroupCommit 为 true,说明进入了 group commit 的流程。 # 返回的 Label 是 group_commit 开头,是本次导入关联的 label。
同步模式
# 导入时在 header 中增加配置"group_commit:sync_mode"。 curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:sync_mode" -H "column_separator:," http://{selectdbHost}:{selectdbHttpPort}/api/db/dt/_stream_load { "TxnId": 3009, "Label": "group_commit_d941bf17f6efcc80_ccf4afdde9881293", "Comment": "", "GroupCommit": true, "Status": "Success", "Message": "OK", "NumberTotalRows": 2, "NumberLoadedRows": 2, "NumberFilteredRows": 0, "NumberUnselectedRows": 0, "LoadBytes": 19, "LoadTimeMs": 10044, "StreamLoadPutTimeMs": 4, "ReadDataTimeMs": 0, "WriteDataTimeMs": 10038 } # 返回的参数 GroupCommit 为 true,说明进入了 group commit 的流程。 # 返回的 Label 是 group_commit 开头的,是本次导入关联的 label。
自动提交条件
当满足时间间隔(默认为10秒) 或数据量(默认为64 MB) 其中一个条件时,会自动提交数据。
提交间隔
Group Commit的默认提交时间间隔为10秒,您可以通过如下语句对某个表的提交间隔时间进行修改,示例如下。
-- 修改提交间隔为 2 秒。
ALTER TABLE dt SET ("group_commit_interval_ms" = "2000");
提交数据量
Group Commit的默认提交数据量为64 MB,您可以通过如下语句对某个表的提交数据量进行修改,示例如下。
-- 修改提交数据量为 128MB。
ALTER TABLE dt SET ("group_commit_data_bytes" = "134217728");