云数据库 SelectDB 版兼容标准SQL语法,可通过标准的Insert Into方式导入数据。
背景信息
Insert Into命令是MySQL等数据库中常用的数据导入方式。云数据库 SelectDB 版兼容标准SQL语法,支持通过Insert Into命令导入数据。包含以下两种:
Insert Into tbl SELECT ...
Insert Into tbl (col1, col2, ...) VALUES (1, 2, ...), (1,3, ...);
重要此命令不建议在生产环境中使用。
Insert Into Select
通过SelectDB提供的大量SQL函数、联邦查询能力,Insert Into Select可以对SelectDB内部数据、外部数据湖数据等进行高效的计算处理,然后导入SelectDB的新表中,用来进一步进行数据分析服务。
内表数据ETL
如果数据已经在SelectDB表中,可通过Insert Into Select进行数据ETL转换,然后导入到一个新表中。示例如下。
INSERT INTO bj_store_sales
SELECT id, total, user_id, sale_timestamp FROM store_sales WHERE region = "bj";
数据湖数据同步
如果数据在数据湖等外部系统中,可以在SelectDB中创建Catalog,映射到数据湖等外部系统中的数据,然后通过Insert Into Select将其中的数据导入到SelectDB表中。SelectDB支持对接Hive、Iceberg、Hudi、Elasticsearch、JDBC等数据源,详细请参见湖仓一体。
如下以Hive数据源为例,介绍如何同步数据湖数据到SelectDB中。
创建Hive Catalog,即可通过联邦查询访问Hive中的数据,示例如下。
CREATE CATALOG test_catalog comment 'hive catalog' PROPERTIES (
'type'='hms',
'hive.metastore.uris' = 'thrift://127.0.0.1:7004',
'dfs.nameservices'='HANN',
'dfs.ha.namenodes.HANN'='nn1,nn2',
'dfs.namenode.rpc-address.HANN.nn1'='nn1_host:rpc_port',
'dfs.namenode.rpc-address.HANN.nn2'='nn2_host:rpc_port',
'dfs.client.failover.proxy.provider.HANN'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
);
通过Insert Into Select,同步Hive数据到SelectDB中,并指定导入作业唯一标识Label。
INSERT INTO bj_store_sales
WITH LABEL test_label
SELECT id, total, user_id, sale_timestamp FROM test_catalog.test_db.store_sales WHERE region = "bj";
Insert Into Values
Insert Into Values是MySQL等数据库中常用的数据写入方式,建议仅用于测试环境的使用。典型的使用方式是直接通过SQL客户端、JDBC程序发送数据写入请求。
创建待导入的SelectDB数据表如下。
CREATE TABLE test_table
(
id int,
name varchar(50),
age int
)
DISTRIBUTED BY HASH(id) BUCKETS 4
PROPERTIES("replication_num" = "1");
SQL示例
BEGIN;
INSERT INTO db.tbl VALUES(),(),();
INSERT INTO db.tbl VALUES(),(),();
INSERT INTO db.tbl VALUES(),(),();
COMMIT;
JDBC程序示例
public static void main(String[] args) throws Exception {
// 单次导入插入语句的数量。
int insertNum = 10;
// 单条插入攒批的数量。
int batchSize = 10000;
String URL="jdbc:mysql://<IP地址>:<MySQL协议端口>/test_db?useLocalSessionState=true"; // VPC ID所对应的IP地址。您可以登录VPC控制台在VPC列表中找到目标VPC ID所对应的IP地址。
Connection connection = DriverManager.getConnection(URL, "admin", "password"); // 云数据库SelectDB版实例的账号和密码。
Statement statement = connection.createStatement();
statement.execute("begin");
// 拼接多条插入语句。
for (int num = 0; num < insertNum; num++) {
StringBuilder sql = new StringBuilder();
sql.append("Insert Into test_tbl values ");
for(int i = 0; i < batchSize; i++){
if(i > 0){
sql.append(",");
}
// 拼接一行数据,如:姓名,年龄。可根据具体业务修改。
sql.append("('zhangsan',18)");
}
//add sql to batch: Insert Into tbl values(),(),()
statement.addBatch(sql.toString());
}
statement.addBatch("commit");
statement.executeBatch();
// 关闭资源。
statement.close();
connection.close();
}
最佳实践
查看返回结果。
Insert Into操作是一个同步操作,返回结果即表示操作结束。您需要根据返回结果的不同,进行对应的处理。
执行成功,结果集为空。
如果 insert 对应 select 语句的结果集为空,则返回如下:
INSERT INTO tbl1 SELECT * FROM empty_tbl; Query OK, 0 rows affected (0.02 sec)
Query OK
表示执行成功。0 rows affected
表示没有数据被导入。执行成功,结果集不为空。
在结果集不为空的情况下。返回结果分为如下几种情况。
INSERT INTO tbl1 SELECT * FROM tbl2; Query OK, 4 rows affected (0.38 sec) {'label':'insert_8510c568-9eda-****-9e36-6adc7d35291c', 'status':'visible', 'txnId':'4005'} INSERT INTO tbl1 with label my_label1 SELECT * FROM tbl2; Query OK, 4 rows affected (0.38 sec) {'label':'my_label1', 'status':'visible', 'txnId':'4005'} INSERT INTO tbl1 SELECT * FROM tbl2; Query OK, 2 rows affected, 2 warnings (0.31 sec) {'label':'insert_f0747f0e-7a35-****-affa-13a235f4020d', 'status':'visible', 'txnId':'4005'} INSERT INTO tbl1 SELECT * FROM tbl2; Query OK, 2 rows affected, 2 warnings (0.31 sec) {'label':'insert_f0747f0e-7a35-****-affa-13a235f4020d', 'status':'committed', 'txnId':'4005'}
其中,
Query OK
表示执行成功。4 rows affected
表示总共有4行数据被导入。2 warnings
表示被过滤的行数。同时会返回一个 JSON 串。{'label':'my_label1', 'status':'visible', 'txnId':'4005'} {'label':'insert_f0747f0e-7a35-****-affa-13a235f4020d', 'status':'committed', 'txnId':'4005'} {'label':'my_label1', 'status':'visible', 'txnId':'4005', 'err':'some other error'}
其中,
label
为您指定的 label 或自动生成的label,label是该Insert Into导入作业的标识,每个导入作业,都有一个在单database内部唯一的label。status
表示导入数据是否可见,如果可见显示visible
,如果不可见显示committed
。txnId
为这个insert对应的导入事务的id。err
字段会显示一些其他非预期错误。当需要查看被过滤的行时,您可以通过如下语句:
SHOW LOAD WHERE label="xxx";
返回结果中的 URL 可以用于查询错误的数据,具体见后面查看错误行小结。数据不可见是一个临时状态,这批数据最终是一定可见的。可以通过如下语句查看这批数据的可见状态:
SHOW TRANSACTION WHERE id=4005;
返回结果中的
TransactionStatus
列如果为visible
,则表述数据可见。执行失败。
执行失败表示没有任何数据被成功导入,并返回如下:
INSERT INTO tbl1 SELECT * FROM tbl2 WHERE k1 = "a"; ERROR 1064 (HY000): all partitions have no load data. url: http://10.74.167.16:8042/api/_load_error_log?file=__shard_2/error_log_insert_stmt_ba8bb9e158e4879-ae8de8507c0bf8a2_ba8bb9e158e4879_ae8de8507c0bf8a2
其中
ERROR 1064 (HY000): all partitions have no load data
显示失败原因。通过其中的 URL 可以用于查询错误的数据:SHOW LOAD WARNINGS ON "url";
超时时间。
Insert Into操作的超时时间由会话变量
query_timeout
控制,默认为5分钟。超时则作业会被取消。Label和原子性。
Insert Into操作同样能够保证导入的原子性。当需要使用
CTE(Common Table Expressions)
作为 Insert Into操作中的查询部分时,必须指定WITH LABEL
和column
部分。过滤阈值。
与其他导入方式不同,Insert Into操作不能指定过滤阈值(
max_filter_ratio
)。默认的过滤阈值为 1,即有错误行都可以被忽略。对于有要求数据不能够被过滤的业务场景,可以通过设置会话变量
enable_insert_strict
为true
来确保当有数据被过滤掉的时候,Insert Into
不会被执行成功。性能问题。
不建议使用
Insert Into Values
方式进行数据导入,尤其是大数据的线上生产环境。如果必须这样使用,请将多行数据合并到一个Insert Into语句中进行批量提交,单个批次建议1000~1000000条数据。部分列更新。
Insert Into的默认行为是整行写入。在Unique数据模型MOW实现方式中,客户可按需开启部分列更新功能,需要设置如下会话变量:
set enable_unique_key_partial_update=true
需要注意的是,控制Insert Into语句是否开启严格模式的会话变量
enable_insert_strict
的默认值为true,意味着不允许更新不存在的key。所以,在使用Insert Into语句进行部分列更新时,如果希望能插入不存在的key,需要同时将enable_insert_strict
设置为false。