本文介绍如何通过Broker Load导入数据至云数据库 SelectDB 版实例。
背景信息
Broker Load是一种异步的导入方式,通过读取远端存储(如HDFS、S3)上的数据,导入数据到云数据库 SelectDB 版的表中。您可通过MySQL协议创建Broker Load导入,并通过SHOW LOAD
命令检查导入结果。单次导入数据量最多可支持百GB级别。
创建导入
该方式用于通过Broker导入,读取远端存储(如HDFS、S3)上的数据导入到云数据库 SelectDB 版的表中。
语法
LOAD LABEL load_label
(
data_desc1[, data_desc2, ...]
)
WITH broker_type
[broker_properties]
[load_properties];
参数说明
参数名称 | 参数说明 |
| 导入任务的唯一标识。Label是在导入命令中自定义的名称。通过Label,可以查看对应导入任务的执行情况。Label也可用于防止重复导入相同的数据,当Label对应的导入作业状态为CANCELLED时,该Label可以再次被使用。 格式: 说明 推荐同一批次数据使用相同的Label。这样同一批次数据的重复请求只会被接受一次,保证了At-Most-Once。 |
| 描述一组需要导入的文件。详细参数说明,请参见data_desc1参数说明。 |
| 指定需要使用的Broker类型,支持HDFS、S3两种。S3类型的Broker Load也称为OSS Load,详情请参见OSS Load。 |
| 指定Broker所需的参数让Broker能够访问远端存储系统。例如:BOS或HDFS。 语法如下:
|
| 指定导入的相关参数。详细参数说明,请参见load_properties参数说明。 |
data_desc1参数说明
[MERGE|APPEND|DELETE]
DATA INFILE
(
"file_path1"[, file_path2, ...]
)
[NEGATIVE]
INTO TABLE `table_name`
[PARTITION (p1, p2, ...)]
[COLUMNS TERMINATED BY "column_separator"]
[FORMAT AS "file_type"]
[(column_list)]
[COLUMNS FROM PATH AS (c1, c2, ...)]
[PRECEDING FILTER predicate]
[SET (column_mapping)]
[WHERE predicate]
[DELETE ON expr]
[ORDER BY source_sequence]
[PROPERTIES ("key1"="value1", ...)]
参数名称 | 参数说明 |
| 指定数据合并类型。默认为APPEND。默认值表示本次导入是普通的追加写操作。MERGE和DELETE类型仅适用于Unique Key模型表。其中MERGE类型需要配合[DELETE ON]语句使用,以标注Delete Flag列。而DELETE类型则表示本次导入的所有数据皆为删除数据。 |
| 指定需要导入的文件路径。需要导入的文件路径可以是多个,可以使用通配符。路径最终必须匹配到文件,如果只匹配到目录则导入失败。 |
| 表示本次导入为一批“负”导入。这种方式仅针对具有整型SUM聚合类型的聚合数据表。该方式会将导入数据中SUM聚合列对应的整型数值取反。用于冲抵之前导入错误的数据。 |
| 指定仅导入表的某些分区,不在分区范围内的数据将被忽略。 |
| 指定列分隔符,仅在CSV格式下有效,仅能指定单字节分隔符。 |
| 指定文件类型,默认为CSV。支持CSV、PARQUET和ORC格式。 |
| 指定原始文件中的列顺序。 |
| 指定从导入的文件中抽取的列。 |
| 指定前置过滤条件。数据首先根据 |
| 指定列的转换函数。 |
| 指定数据的过滤条件。 |
| 需配合MERGE导入模式一起使用,仅针对Unique Key模型的表。用于指定导入数据中表示Delete Flag的列和计算关系。 |
| 仅针对使用Unique Key模型的表。用于指定导入数据中表示Sequence Col的列。主要用于导入时保证数据顺序。 |
| 指定导入的format的一些参数。如导入的文件是JSON格式,则可以在这里指定json_root、jsonpaths、fuzzy_parse等参数。 |
load_properties参数说明
参数名称 | 参数说明 |
| 导入超时时间,单位为秒,默认为14400,即4小时。 |
| 最大容忍可过滤比率(数据不规范等原因)。默认0,即零容忍。取值范围为0~1。 |
| 导入内存限制,单位为字节,默认为2147483648,即2 GB。 |
| 设置导入任务是否开启严格模式,默认为false。 |
| 指定某些受时区影响的函数的时区,默认为 |
| 导入并发度,默认为1。调大导入并发度会启动多个执行计划同时执行导入任务,加快导入速度。 |
| 用于设置发送批处理数据的并行度,如果并行度的值超过计算集群BE配置中的max_send_batch_parallelism_per_job,那么计算集群将使用max_send_batch_parallelism_per_job的值。 |
| 是否只导入数据到对应分区的一个tablet,默认值为false。该参数只允许在对带有random分桶的Duplicate表导入数据的时候设置。 |
使用示例
创建待导入的SelectDB数据表,示例如下。
CREATE TABLE test_table ( id int, name varchar(50), age int, address varchar(50) ) UNIQUE KEY(`id`) DISTRIBUTED BY HASH(id) BUCKETS 4 PROPERTIES("replication_num" = "1"); CREATE TABLE test_table2 ( id int, name varchar(50), age int, address varchar(50) ) DISTRIBUTED BY HASH(id) BUCKETS 4 PROPERTIES("replication_num" = "1");
创建待导入的文件。
文件
file1.txt
,文件内容如下。1,tomori,32,shanghai 2,anon,22,beijing 3,taki,23,shenzhen 4,rana,45,hangzhou 5,soyo,14,shanghai 6,saki,25,hangzhou 7,mutsumi,45,shanghai 8,uika,26,shanghai 9,umiri,27,shenzhen 10,nyamu,37,shanghai
文件
file2.csv
,文件内容如下。1,saki,25,hangzhou 2,mutsumi,45,shanghai 3,uika,26,shanghai 4,umiri,27,shenzhen 5,nyamu,37,shanghai
将文件数据导入到表中。
从HDFS导入文本文件
file1.txt
,示例如下。LOAD LABEL example_db.label1 ( DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt") INTO TABLE `my_table` COLUMNS TERMINATED BY "," ) WITH HDFS ( "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port" );
导入文件file1.txt,按逗号分隔,导入到表test_table。当从HDFS导入数据时,broker_properties中需要指定
fs.defaultFS
属性,以确保可以正确的连接到HDFS集群并找到相应的数据文件。从HDFS导入数据,同时导入两个文件到两个表中,示例如下。
LOAD LABEL test_db.test_02 ( DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file2.csv") INTO TABLE `test_table` COLUMNS TERMINATED BY "," (id,name,temp_age,address) SET ( age = temp_age + 1 ), DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt") INTO TABLE `test_table2` COLUMNS TERMINATED BY "," ) WITH HDFS ( "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port" );
导入两个文件
file1.txt
和file2.csv
,分别导入到test_table
和test_table2
两张表中,并且将file2.csv
中age的值加1后导入。从HA模式部署的HDFS集群中,导入一批数据,示例如下。
LOAD LABEL test_db.test_03 ( DATA INFILE("hdfs://hdfs_host:hdfs_port/example/*") INTO TABLE `test_table` COLUMNS TERMINATED BY "\\x01" ) WITH HDFS ( "hadoop.username" = "hive", "fs.defaultFS" = "hdfs://my_ha", "dfs.nameservices" = "my_ha", "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2", "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port", "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port", "dfs.client.failover.proxy.provider.my_ha" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" );
指定分隔符为Hive的默认分隔符
\\x01
,并使用通配符*
指定data
目录下所有目录的所有文件。对导入数据file1.txt进行过滤处理,符合条件的数据才可导入,示例如下。
LOAD LABEL test_db.test_04 ( DATA INFILE("hdfs://host:port/example/file1.txt") INTO TABLE `test_table2` COLUMNS TERMINATED BY "," (id,name,age,address) WHERE age < 20 ) WITH HDFS ( "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port" );
只有原始数据中
age < 20
的行才会被导入。从HDFS导入一批数据file1.txt,指定超时时间和过滤比例,并且将原有数据中与导入数据中
age<20
的列相匹配的行删除,其他行正常导入,示例如下。LOAD LABEL test_db.test_05 ( MERGE DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt") INTO TABLE `test_table` COLUMNS TERMINATED BY "," (id,name,age,address) DELETE ON age < 20 ) WITH HDFS ( "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port" ) PROPERTIES ( "timeout" = "3600", "max_filter_ratio" = "0.1" );
使用MERGE方式导入。
test_table
必须是一张Unique Key的表。当导入数据中的age列的值小于20
时,该行会被认为是一个删除行。导入任务的超时时间是3600秒,并且允许错误率在10%以内。
取消导入
当Broker Load作业状态不为CANCELLED或FINISHED时,可以手动取消导入。取消时需要指定待取消导入任务的Label。导入任务取消后,已写入的数据也会回滚,不会生效。
语法
CANCEL LOAD
[FROM db_name]
WHERE [LABEL = "load_label" | LABEL like "label_pattern"];
参数说明
参数名称 | 参数说明 |
| 数据库名称。不指定的时使用当前默认数据库。 |
| 导入任务的Label名称,精确匹配。如果使用LABEL LIKE,则会匹配导入任务的Label包含label_pattern的导入任务。 |
使用示例
撤销数据库
example_db
上,Label为example_db_test_load_label
的导入作业。CANCEL LOAD FROM example_db WHERE LABEL = "example_db_test_load_label";
撤销数据库
example_db
上,所有包含example_
的导入作业。CANCEL LOAD FROM example_db WHERE LABEL like "example_";
查看导入
Broker Load是一个异步导入过程,语句执行成功仅代表导入任务提交成功,并不代表数据导入成功。导入状态需要通过SHOW LOAD
命令查看。
语法
SHOW LOAD
[FROM db_name]
[
WHERE
[LABEL [ = "your_label" | LIKE "label_matcher"]]
[STATE = ["PENDING"|"ETL"|"LOADING"|"FINISHED"|"CANCELLED"|]]
]
[ORDER BY ...]
[LIMIT limit][OFFSET offset];
参数说明
参数名称 | 参数说明 |
| 数据库名称。不指定的场合使用当前默认数据库。 |
| 导入任务的Label名称,精确匹配。如果使用LABEL LIKE,则会匹配导入任务的Label包含label_matcher的导入任务。 |
| 导入状态。只查看指定状态的导入任务。 |
| 指定排序依据。 |
| 显示Limit条匹配记录。不指定的场合全部显示。 |
| 从偏移量offset开始显示查询结果。默认情况下偏移量为0。 |
使用示例
展示数据库
example_db
的导入任务,Label中包含字符串2014_01_02
,展示保存时间最久的10个。SHOW LOAD FROM example_db WHERE LABEL LIKE "2014_01_02" LIMIT 10;
展示数据库
example_db
的导入任务,指定Label为load_example_db_20140102
并按LoadStartTime
降序排序。SHOW LOAD FROM example_db WHERE LABEL = "load_example_db_20140102" ORDER BY LoadStartTime DESC;
展示数据库
example_db
的导入任务,指定Label为load_example_db_20140102
,state为loading
。SHOW LOAD FROM example_db WHERE LABEL = "load_example_db_20140102" AND STATE = "loading";
展示数据库
example_db
的导入任务,按LoadStartTime
降序排序,并从偏移量5开始显示10条查询结果。SHOW LOAD FROM example_db ORDER BY LoadStartTime DESC limit 5,10; SHOW LOAD FROM example_db ORDER BY LoadStartTime DESC limit 10 offset 5;
最佳实践
查看导入任务状态
Broker Load是一个异步导入过程,语句执行成功仅代表导入任务提交成功,并不代表数据导入成功。导入状态需要通过
SHOW LOAD
命令查看。取消导入任务
已提交切尚未结束的导入任务可以通过
CANCEL LOAD
命令取消。取消后,已写入的数据也会回滚,不会生效。Label、导入事务、多表原子性
SelectDB中所有导入任务都是原子生效的。并且在同一个导入任务中对多张表的导入也能够保证原子性。同时,SelectDB还可以通过Label的机制来保证数据导入的不丢失和不重复。
列映射、衍生列和过滤
SelectDB可以在导入语句中支持非常丰富的列转换和过滤操作。支持绝大多数内置函数和UDF。详情请参见数据转换文档。
错误数据过滤
SelectDB的导入任务可以容忍一部分格式错误的数据。容忍了通过
max_filter_ratio
设置。默认为0,即表示当有一条错误数据时,整个导入任务将会失败。如果您希望忽略部分有问题的数据行,可以将次参数设置为0~1之间的数值,SelectDB会自动跳过哪些数据格式不正确的行。关于容忍率的一些计算方式,详情请参见数据转换文档。
严格模式
strict_mode
属性用于设置导入任务是否运行在严格模式下。该格式会对列映射、转换和过滤的结果产生影响。超时时间
Broker Load的默认超时时间为4小时,从任务提交开始算起。如果超时未完成,则任务会失败。
数据量和任务数限制
建议通过Broker Load单次导入100 GB以内的数据。虽然理论上在一个导入任务中导入的数据量没有上限,但是提交过大的导入会导致运行时间较长,并且失败后重试的代价也会较大。
同时受限于集群规模,我们限制了导入的最大数据量为节点数乘3 GB。以保证系统资源的合理利用。如果有大数据量需要导入,建议分成多个导入任务提交。
SelectDB会限制集群内同时运行的导入任务数量,通常在3~10个之间,之后提交的导入作业会排队等待。队列最大长度为100,之后的提交会直接拒绝。
说明排队时间也会被计算到作业总时间中。如果超时,则作业会被取消。所以建议通过监控作业运行状态来合理控制作业提交频率。