Group Commit

本文介绍如何使用分组导入(Group Commit)模式在高并发小写入的场景下来提升导入性能。

概述

Group Commit不是一种新的导入方式,而是对INSERT INTO tbl VALUES(...)Stream Load的扩展,大幅提升了高并发小写入的性能。您的应用程序可以直接使用JDBC将数据高频写入云数据库 SelectDB 版,同时通过使用PreparedStatement可以获得更高的性能。在日志场景下,您也可以利用Stream Load或者Http Stream将数据高频写入云数据库 SelectDB 版。Group Commit写入有如下三种模式。

  • 关闭模式(off_mode)

    不开启Group Commit,保持以上三种导入方式的默认行为。

  • 同步模式(sync_mode)

    SelectDB会根据负载和表的group_commit_interval属性将多个导入在一个事务提交,事务提交后返回导入结果。这适用于高并发写入场景,且在导入完成后要求数据立即可见。

  • 异步模式(async_mode)

    异步模式适用于写入延迟敏感以及高频写入的场景。SelectDB首先将数据写入WAL(Write Ahead Log),然后立即返回导入结果。SelectDB会根据负载和表的group_commit_interval属性异步提交数据,提交之后数据可见。为了防止WAL占用较大的磁盘空间,单次导入数据量较大时,会自动切换为sync_mode

使用限制

  • 当开启了Group Commit模式,系统会判断您发起的INSERT INTO VALUES语句是否符合Group Commit的条件,如果符合,该语句的执行会进入到Group Commit写入中。但符合如下条件的语句会自动退化为非Group Commit方式。

    • 事务写入:即Begin;INSERT INTO VALUES;COMMIT方式

    • 指定Label:即INSERT INTO dt WITH LABEL {label} VALUES

    • VALUES中包含表达式:即INSERT INTO dt VALUES (1 + 100)

    • 列更新写入。

    • 表不支持light schema change。

  • 当开启了Group Commit模式,系统会判断您发起的Stream LoadHttp Stream是否符合Group Commit的条件,如果符合,该导入的执行会进入到Group Commit写入中。但符合如下条件的会自动退化为非Group Commit方式。

    • 指定Label:即通过-H "label:my_label"设置。

    • 两阶段提交。

    • 列更新写入。

    • 表不支持light schema change。

  • 对于Unique模型,由于Group Commit不能保证提交顺序,您可以配合Sequence列使用来保证数据一致性。

  • max_filter_ratio语义的支持。

    • 在默认的导入中,filter_ratio是导入完成后,通过失败的行数和总行数计算,决定是否提交本次写入。

    • 在Group Commit模式下,由于多个客户端发起的导入都会由一个内部导入执行,虽然可以计算出每个导入的filter_ratio,但是数据一旦进入内部导入,就只能commit transaction。

    • Group Commit模式支持了一定程度的max_filter_ratio语义,当导入的总行数不高于group_commit_memory_rows_for_max_filter_ratio(BE配置参数,默认为10000行),max_filter_ratio正常工作。

  • WAL限制

    • 对于async_mode的Group Commit写入,会把数据写入WAL。如果内部导入成功,则WAL被立刻删除;如果内部导入失败,通过导入WAL的方法来恢复数据。

    • 对于async_mode的Group Commit写入,为了保护磁盘空间,当遇到如下情况时,会切换成sync_mode

      • 导入数据量过大,即超过WAL单目录的80%空间。

      • 数据量规模不确定的chunked stream load。

      • 导入数据量不大,但磁盘可用空间不足。

    • 当发生重量级Schema Change(目前加减列、修改varchar长度和重命名列是轻量级Schema Change,其它的是重量级Schema Change)时,为了保证WAL能够适配表的Schema,在Schema Change最后的修改元数据阶段,会拒绝Group Commit写入,客户端收到insert table ${table_name} is blocked on schema change 异常,请您在客户端重试即可。

使用示例

创建表dt,示例如下。

CREATE TABLE `dt` (
    `id` int(11) NOT NULL,
    `name` varchar(50) NULL,
    `score` int(11) NULL
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1;

使用JDBC方式

当您使用JDBCINSERT INTO VALUES方式写入时,为了减少SQL解析和生成规划的开销,云数据库 SelectDB 版支持了MySQL协议的PreparedStatement特性。当您使用PreparedStatement时,SQL和其导入规划将被缓存到Session级别的内存缓存中,后续的导入直接使用缓存对象,降低了集群的CPU压力。下面是在JDBC中使用PreparedStatement的使用示例。

  1. 添加依赖。

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.49</version>
    </dependency>
  2. 设置JDBC URL并在Server端开启PreparedStatement。

    url = jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030/db?useServerPrepStmts=true
  3. 配置group_commitsession变量,有如下两种方式。

    • 通过JDBC URL设置,增加参数sessionVariables=group_commit=async_mode

      url = jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030/db?useServerPrepStmts=true&sessionVariables=group_commit=async_mode
    • 通过执行SQL设置

      try (Statement statement = conn.createStatement()) {
          statement.execute("SET group_commit = async_mode;");
      }
  4. 使用PreparedStatement

        private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
        private static final String URL_PATTERN = "jdbc:mysql://%s:%d/%s?useServerPrepStmts=true";
        private static final String HOST = "selectdb-cn-****.selectdbfe.rds.aliyuncs.com";
        private static final int PORT = 9030;
        private static final String DB = "db";
        private static final String TBL = "dt";
        private static final String USER = "admin";
        private static final String PASSWD = "***";
        private static final int INSERT_BATCH_SIZE = 10;
        
        public static void main(String[] args) {
            groupCommitInsert();
            //groupCommitInsertBatch
        }
        
        private static void groupCommitInsert() throws Exception {
            Class.forName(JDBC_DRIVER);
            try (Connection conn = DriverManager.getConnection(String.format(URL_PATTERN, HOST, PORT, DB), USER, PASSWD)) {
                // set session variable 'group_commit'
                try (Statement statement = conn.createStatement()) {
                    statement.execute("SET group_commit = async_mode;");
                }
    
                String query = "INSERT INTO " + TBL + " VALUES(?, ?, ?)";
                try (PreparedStatement stmt = conn.prepareStatement(query)) {
                    for (int i = 0; i < INSERT_BATCH_SIZE; i++) {
                        stmt.setInt(1, i);
                        stmt.setString(2, "name" + i);
                        stmt.setInt(3, i + 10);
                        int result = stmt.executeUpdate();
                        System.out.println("rows: " + result);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        private static void groupCommitInsertBatch() throws Exception {
            Class.forName(JDBC_DRIVER);
            // add rewriteBatchedStatements=true and cachePrepStmts=true in JDBC url
            // set session variables by sessionVariables=group_commit=async_mode in JDBC url
            try (Connection conn = DriverManager.getConnection(
                    String.format(URL_PATTERN + "&rewriteBatchedStatements=true&cachePrepStmts=true&sessionVariables=group_commit=async_mode", HOST, PORT, DB), USER, PASSWD)) {
    
                String query = "INSERT INTO " + TBL + " VALUES(?, ?, ?)";
                try (PreparedStatement stmt = conn.prepareStatement(query)) {
                    for (int j = 0; j < 5; j++) {
                        // 10 rows per insert
                        for (int i = 0; i < INSERT_BATCH_SIZE; i++) {
                            stmt.setInt(1, i);
                            stmt.setString(2, "name" + i);
                            stmt.setInt(3, i + 10);
                            stmt.addBatch();
                        }
                        int[] result = stmt.executeBatch();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

使用INSERT INTO方式

以下分别介绍使用INSERT INTO插入数据的异步和同步模式。

  • 异步模式

    -- 配置 session 变量开启 group commit (默认为 off_mode),开启异步模式。
    mysql> SET group_commit = async_mode;
    
    -- 这里返回的 label 是 group_commit 开头的,用于区分是否使用了 group commit模式。
    mysql> INSERT INTO dt VALUES(1, 'Bob', 90), (2, 'Alice', 99);
    Query OK, 2 rows affected (0.05 sec)
    {'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'}
    
    -- 如下 label 和 txn_id 和上述插入的相同,说明是攒到了同一个导入任务中。
    mysql> INSERT INTO dt(id, name) VALUES(3, 'John');
    Query OK, 1 row affected (0.01 sec)
    {'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'}
    
    -- 不可以立刻查询到导入结果。
    mysql> SELECT * FROM dt;
    Empty SET (0.01 sec)
    
    -- 过10 秒后执行查询,可以查询到,可以通过表属性 group_commit_interval 控制数据可见的延迟。
    mysql> SELECT * FROM dt;
    +------+-------+-------+
    | id   | name  | score |
    +------+-------+-------+
    |    1 | Bob   |    90 |
    |    2 | Alice |    99 |
    |    3 | John  |  NULL |
    +------+-------+-------+
    3 rows in set (0.02 sec)
  • 同步模式

    -- 配置 session 变量开启 group commit (默认为 off_mode),开启同步模式。
    mysql> SET group_commit = sync_mode;
    
    -- 导入耗时为表属性group_commit_interval。返回的 label 是 group_commit 开头的,可以区分出是否用了 group commit 模式。
    mysql> INSERT INTO dt VALUES(4, 'Bob', 90), (5, 'Alice', 99);
    Query OK, 2 rows affected (10.06 sec)
    {'label':'group_commit_d84ab96c09b60587_ec455a33cb0e9e87', 'status':'PREPARE', 'txnId':'3007', 'query_id':'fc6b94085d704a94-a69bfc9a202e66e2'}
    
    -- 可以立刻查询到导入结果。
    mysql> SELECT * FROM dt;
    +------+-------+-------+
    | id   | name  | score |
    +------+-------+-------+
    |    1 | Bob   |    90 |
    |    2 | Alice |    99 |
    |    3 | John  |  NULL |
    |    4 | Bob   |    90 |
    |    5 | Alice |    99 |
    +------+-------+-------+
    5 rows in set (0.03 sec)
  • 关闭group commit模式

    mysql> SET group_commit = off_mode;

Stream Load

Stream Load详情请参见Stream Load

  1. 新建data.csv文件,内容如下。

    6,Amy,60
    7,Ross,98
  2. 以下分别介绍如何使用Stream Load开启异步模式或同步模式导入数据。

    • 异步模式

      # 导入时在 header 中增加配置"group_commit:async_mode"。
      curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:async_mode"  -H "column_separator:,"  http://{selectdbHost}:{selectdbHttpPort}/api/db/dt/_stream_load
      {
          "TxnId": 7009,
          "Label": "group_commit_c84d2099208436ab_96e33fda01eddba8",
          "Comment": "",
          "GroupCommit": true,
          "Status": "Success",
          "Message": "OK",
          "NumberTotalRows": 2,
          "NumberLoadedRows": 2,
          "NumberFilteredRows": 0,
          "NumberUnselectedRows": 0,
          "LoadBytes": 19,
          "LoadTimeMs": 35,
          "StreamLoadPutTimeMs": 5,
          "ReadDataTimeMs": 0,
          "WriteDataTimeMs": 26
      }
      
      # 返回的参数 GroupCommit 为 true,说明进入了 group commit 的流程。
      # 返回的 Label 是 group_commit 开头,是本次导入关联的 label。
    • 同步模式

      # 导入时在 header 中增加配置"group_commit:sync_mode"。
      
      curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:sync_mode"  -H "column_separator:,"  http://{selectdbHost}:{selectdbHttpPort}/api/db/dt/_stream_load
      {
          "TxnId": 3009,
          "Label": "group_commit_d941bf17f6efcc80_ccf4afdde9881293",
          "Comment": "",
          "GroupCommit": true,
          "Status": "Success",
          "Message": "OK",
          "NumberTotalRows": 2,
          "NumberLoadedRows": 2,
          "NumberFilteredRows": 0,
          "NumberUnselectedRows": 0,
          "LoadBytes": 19,
          "LoadTimeMs": 10044,
          "StreamLoadPutTimeMs": 4,
          "ReadDataTimeMs": 0,
          "WriteDataTimeMs": 10038
      }
      
      # 返回的参数 GroupCommit 为 true,说明进入了 group commit 的流程。
      # 返回的 Label 是 group_commit 开头的,是本次导入关联的 label。

自动提交条件

当满足时间间隔(默认为10秒) 或数据量(默认为64 MB) 其中一个条件时,会自动提交数据。

提交间隔

Group Commit的默认提交时间间隔为10秒,您可以通过如下语句对某个表的提交间隔时间进行修改,示例如下。

-- 修改提交间隔为 2 秒。
ALTER TABLE dt SET ("group_commit_interval_ms" = "2000");

提交数据量

Group Commit的默认提交数据量为64 MB,您可以通过如下语句对某个表的提交数据量进行修改,示例如下。

-- 修改提交数据量为 128MB。
ALTER TABLE dt SET ("group_commit_data_bytes" = "134217728");