Broker Load

本文介绍如何通过Broker Load导入数据至云数据库 SelectDB 版实例。

背景信息

Broker Load是一种异步的导入方式,通过读取远端存储(如HDFS、S3)上的数据,导入数据到云数据库 SelectDB 版的表中。您可通过MySQL协议创建Broker Load导入,并通过SHOW LOAD命令检查导入结果。单次导入数据量最多可支持百GB级别。

创建导入

该方式用于通过Broker导入,读取远端存储(如HDFS、S3)上的数据导入到云数据库 SelectDB 版的表中。

语法

LOAD LABEL load_label
(
data_desc1[, data_desc2, ...]
)
WITH broker_type
[broker_properties]
[load_properties];

参数说明

参数名称

参数说明

load_label

导入任务的唯一标识。Label是在导入命令中自定义的名称。通过Label,可以查看对应导入任务的执行情况。Label也可用于防止重复导入相同的数据,当Label对应的导入作业状态为CANCELLED时,该Label可以再次被使用。

格式:[database.]label_name

说明

推荐同一批次数据使用相同的Label。这样同一批次数据的重复请求只会被接受一次,保证了At-Most-Once。

data_desc1

描述一组需要导入的文件。详细参数说明,请参见data_desc1参数说明

WITH broker_type

指定需要使用的Broker类型,支持HDFS、S3两种。S3类型的Broker Load也称为OSS Load,详情请参见OSS Load

broker_properties

指定Broker所需的参数让Broker能够访问远端存储系统。例如:BOS或HDFS。

语法如下:

( "key1" = "val1", "key2" = "val2", ...)

load_properties

指定导入的相关参数。详细参数说明,请参见load_properties参数说明

data_desc1参数说明

[MERGE|APPEND|DELETE]
DATA INFILE
(
"file_path1"[, file_path2, ...]
)
[NEGATIVE]
INTO TABLE `table_name`
[PARTITION (p1, p2, ...)]
[COLUMNS TERMINATED BY "column_separator"]
[FORMAT AS "file_type"]
[(column_list)]
[COLUMNS FROM PATH AS (c1, c2, ...)]
[PRECEDING FILTER predicate]
[SET (column_mapping)]
[WHERE predicate]
[DELETE ON expr]
[ORDER BY source_sequence]
[PROPERTIES ("key1"="value1", ...)]

参数名称

参数说明

[MERGE|APPEND|DELETE]

指定数据合并类型。默认为APPEND。默认值表示本次导入是普通的追加写操作。MERGE和DELETE类型仅适用于Unique Key模型表。其中MERGE类型需要配合[DELETE ON]语句使用,以标注Delete Flag列。而DELETE类型则表示本次导入的所有数据皆为删除数据。

DATA INFILE

指定需要导入的文件路径。需要导入的文件路径可以是多个,可以使用通配符。路径最终必须匹配到文件,如果只匹配到目录则导入失败。

NEGATIVE

表示本次导入为一批“负”导入。这种方式仅针对具有整型SUM聚合类型的聚合数据表。该方式会将导入数据中SUM聚合列对应的整型数值取反。用于冲抵之前导入错误的数据。

PARTITION(p1, p2, ...)

指定仅导入表的某些分区,不在分区范围内的数据将被忽略。

COLUMNS TERMINATED BY

指定列分隔符,仅在CSV格式下有效,仅能指定单字节分隔符。

FORMAT AS

指定文件类型,默认为CSV。支持CSV、PARQUET和ORC格式。

column list

指定原始文件中的列顺序。

COLUMNS FROM PATH AS

指定从导入的文件中抽取的列。

PRECEDING FILTER predicate

指定前置过滤条件。数据首先根据column listCOLUMNS FROM PATH AS按顺序拼接成原始数据行。然后按照前置过滤条件进行过滤。

SET (column_mapping)

指定列的转换函数。

WHERE predicate

指定数据的过滤条件。

DELETE ON expr

需配合MERGE导入模式一起使用,仅针对Unique Key模型的表。用于指定导入数据中表示Delete Flag的列和计算关系。

ORDER BY

仅针对使用Unique Key模型的表。用于指定导入数据中表示Sequence Col的列。主要用于导入时保证数据顺序。

PROPERTIES ("key1"="value1", ...)

指定导入的format的一些参数。如导入的文件是JSON格式,则可以在这里指定json_root、jsonpaths、fuzzy_parse等参数。

load_properties参数说明

参数名称

参数说明

timeout

导入超时时间,单位为秒,默认为14400,即4小时。

max_filter_ratio

最大容忍可过滤比率(数据不规范等原因)。默认0,即零容忍。取值范围为0~1。

exec_mem_limit

导入内存限制,单位为字节,默认为2147483648,即2 GB。

strict_mode

设置导入任务是否开启严格模式,默认为false。

timezone

指定某些受时区影响的函数的时区,默认为Asia/Shanghai时区。支持strftime、alignment_timestamp和from_unixtime等。

load_parallelism

导入并发度,默认为1。调大导入并发度会启动多个执行计划同时执行导入任务,加快导入速度。

send_batch_parallelism

用于设置发送批处理数据的并行度,如果并行度的值超过计算集群BE配置中的max_send_batch_parallelism_per_job,那么计算集群将使用max_send_batch_parallelism_per_job的值。

load_to_single_tablet

是否只导入数据到对应分区的一个tablet,默认值为false。该参数只允许在对带有random分桶的Duplicate表导入数据的时候设置。

使用示例

  1. 创建待导入的SelectDB数据表,示例如下。

    CREATE TABLE test_table
    (
        id int,
        name varchar(50),
        age int,
        address varchar(50)
    )
    UNIQUE KEY(`id`)
    DISTRIBUTED BY HASH(id) BUCKETS 4
    PROPERTIES("replication_num" = "1");
    
    CREATE TABLE test_table2
    (
        id int,
        name varchar(50),
        age int,
        address varchar(50)
    )
    DISTRIBUTED BY HASH(id) BUCKETS 4
    PROPERTIES("replication_num" = "1");
  2. 创建待导入的文件。

    • 文件file1.txt,文件内容如下。

      1,tomori,32,shanghai
      2,anon,22,beijing
      3,taki,23,shenzhen
      4,rana,45,hangzhou
      5,soyo,14,shanghai
      6,saki,25,hangzhou
      7,mutsumi,45,shanghai
      8,uika,26,shanghai
      9,umiri,27,shenzhen
      10,nyamu,37,shanghai
    • 文件file2.csv,文件内容如下。

      1,saki,25,hangzhou
      2,mutsumi,45,shanghai
      3,uika,26,shanghai
      4,umiri,27,shenzhen
      5,nyamu,37,shanghai
  3. 将文件数据导入到表中。

    • 从HDFS导入文本文件file1.txt,示例如下。

      LOAD LABEL example_db.label1
      (
          DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt")
          INTO TABLE `my_table`
          COLUMNS TERMINATED BY ","
      )
      WITH HDFS
      (
          "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
      );

      导入文件file1.txt,按逗号分隔,导入到表test_table。当从HDFS导入数据时,broker_properties中需要指定fs.defaultFS属性,以确保可以正确的连接到HDFS集群并找到相应的数据文件。

    • 从HDFS导入数据,同时导入两个文件到两个表中,示例如下。

      LOAD LABEL test_db.test_02
      (
          DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file2.csv")
          INTO TABLE `test_table`
          COLUMNS TERMINATED BY ","
          (id,name,temp_age,address)    
          SET (
              age = temp_age + 1
          ),
          DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt")
          INTO TABLE `test_table2`
          COLUMNS TERMINATED BY ","
      )
      WITH HDFS
      (
          "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
      );

      导入两个文件file1.txtfile2.csv,分别导入到test_tabletest_table2两张表中,并且将file2.csv中age的值加1后导入。

    • 从HA模式部署的HDFS集群中,导入一批数据,示例如下。

      LOAD LABEL test_db.test_03
      (
          DATA INFILE("hdfs://hdfs_host:hdfs_port/example/*")
          INTO TABLE `test_table`
          COLUMNS TERMINATED BY "\\x01"
      )
      WITH HDFS
      (
          "hadoop.username" = "hive",
          "fs.defaultFS" = "hdfs://my_ha",
          "dfs.nameservices" = "my_ha",
          "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
          "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
          "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
          "dfs.client.failover.proxy.provider.my_ha" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
      );

      指定分隔符为Hive的默认分隔符\\x01,并使用通配符*指定data目录下所有目录的所有文件。

    • 对导入数据file1.txt进行过滤处理,符合条件的数据才可导入,示例如下。

      LOAD LABEL test_db.test_04
      (
          DATA INFILE("hdfs://host:port/example/file1.txt")
          INTO TABLE `test_table2`
          COLUMNS TERMINATED BY ","
          (id,name,age,address)   
          WHERE age < 20
      )
      WITH HDFS
      (
          "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
      );
      

      只有原始数据中age < 20的行才会被导入。

    • 从HDFS导入一批数据file1.txt,指定超时时间和过滤比例,并且将原有数据中与导入数据中age<20的列相匹配的行删除,其他行正常导入,示例如下。

      LOAD LABEL test_db.test_05
      (
          MERGE DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt")
          INTO TABLE `test_table`
          COLUMNS TERMINATED BY ","
          (id,name,age,address)   
          DELETE ON age < 20
      )
      WITH HDFS
      (
          "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
      )
      PROPERTIES
      (
          "timeout" = "3600",
          "max_filter_ratio" = "0.1"
      );

      使用MERGE方式导入。test_table必须是一张Unique Key的表。当导入数据中的age列的值小于20时,该行会被认为是一个删除行。导入任务的超时时间是3600秒,并且允许错误率在10%以内。

取消导入

当Broker Load作业状态不为CANCELLED或FINISHED时,可以手动取消导入。取消时需要指定待取消导入任务的Label。导入任务取消后,已写入的数据也会回滚,不会生效。

语法

CANCEL LOAD
[FROM db_name]
WHERE [LABEL = "load_label" | LABEL like "label_pattern"];

参数说明

参数名称

参数说明

db_name

数据库名称。不指定的时使用当前默认数据库。

load_label

导入任务的Label名称,精确匹配。如果使用LABEL LIKE,则会匹配导入任务的Label包含label_pattern的导入任务。

使用示例

  • 撤销数据库example_db上,Label为example_db_test_load_label的导入作业。

    CANCEL LOAD
    FROM example_db
    WHERE LABEL = "example_db_test_load_label";
  • 撤销数据库example_db上,所有包含example_的导入作业。

    CANCEL LOAD
    FROM example_db
    WHERE LABEL like "example_";

查看导入

Broker Load是一个异步导入过程,语句执行成功仅代表导入任务提交成功,并不代表数据导入成功。导入状态需要通过SHOW LOAD命令查看。

语法

SHOW LOAD
[FROM db_name]
[
   WHERE
   [LABEL [ = "your_label" | LIKE "label_matcher"]]
   [STATE = ["PENDING"|"ETL"|"LOADING"|"FINISHED"|"CANCELLED"|]]
]
[ORDER BY ...]
[LIMIT limit][OFFSET offset];

参数说明

参数名称

参数说明

db_name

数据库名称。不指定的场合使用当前默认数据库。

your_label

导入任务的Label名称,精确匹配。如果使用LABEL LIKE,则会匹配导入任务的Label包含label_matcher的导入任务。

STATE

导入状态。只查看指定状态的导入任务。

ORDER BY

指定排序依据。

LIMIT

显示Limit条匹配记录。不指定的场合全部显示。

OFFSET

从偏移量offset开始显示查询结果。默认情况下偏移量为0。

使用示例

  • 展示数据库example_db的导入任务,Label中包含字符串2014_01_02,展示保存时间最久的10个。

    SHOW LOAD FROM example_db WHERE LABEL LIKE "2014_01_02" LIMIT 10;
  • 展示数据库example_db的导入任务,指定Label为load_example_db_20140102并按LoadStartTime降序排序。

    SHOW LOAD FROM example_db WHERE LABEL = "load_example_db_20140102" ORDER BY LoadStartTime DESC;
  • 展示数据库example_db的导入任务,指定Label为load_example_db_20140102,state为loading

    SHOW LOAD FROM example_db WHERE LABEL = "load_example_db_20140102" AND STATE = "loading";
  • 展示数据库example_db的导入任务,按LoadStartTime降序排序,并从偏移量5开始显示10条查询结果。

    SHOW LOAD FROM example_db ORDER BY LoadStartTime DESC limit 5,10;
    SHOW LOAD FROM example_db ORDER BY LoadStartTime DESC limit 10 offset 5;

最佳实践

  • 查看导入任务状态

    Broker Load是一个异步导入过程,语句执行成功仅代表导入任务提交成功,并不代表数据导入成功。导入状态需要通过SHOW LOAD命令查看。

  • 取消导入任务

    已提交切尚未结束的导入任务可以通过CANCEL LOAD命令取消。取消后,已写入的数据也会回滚,不会生效。

  • Label、导入事务、多表原子性

    SelectDB中所有导入任务都是原子生效的。并且在同一个导入任务中对多张表的导入也能够保证原子性。同时,SelectDB还可以通过Label的机制来保证数据导入的不丢失和不重复。

  • 列映射、衍生列和过滤

    SelectDB可以在导入语句中支持非常丰富的列转换和过滤操作。支持绝大多数内置函数和UDF。详情请参见数据转换文档。

  • 错误数据过滤

    SelectDB的导入任务可以容忍一部分格式错误的数据。容忍了通过max_filter_ratio设置。默认为0,即表示当有一条错误数据时,整个导入任务将会失败。如果您希望忽略部分有问题的数据行,可以将次参数设置为0~1之间的数值,SelectDB会自动跳过哪些数据格式不正确的行。

    关于容忍率的一些计算方式,详情请参见数据转换文档。

  • 严格模式

    strict_mode属性用于设置导入任务是否运行在严格模式下。该格式会对列映射、转换和过滤的结果产生影响。

  • 超时时间

    Broker Load的默认超时时间为4小时,从任务提交开始算起。如果超时未完成,则任务会失败。

  • 数据量和任务数限制

    建议通过Broker Load单次导入100 GB以内的数据。虽然理论上在一个导入任务中导入的数据量没有上限,但是提交过大的导入会导致运行时间较长,并且失败后重试的代价也会较大。

    同时受限于集群规模,我们限制了导入的最大数据量为节点数乘3 GB。以保证系统资源的合理利用。如果有大数据量需要导入,建议分成多个导入任务提交。

    SelectDB会限制集群内同时运行的导入任务数量,通常在3~10个之间,之后提交的导入作业会排队等待。队列最大长度为100,之后的提交会直接拒绝。

    说明

    排队时间也会被计算到作业总时间中。如果超时,则作业会被取消。所以建议通过监控作业运行状态来合理控制作业提交频率。