Stream Load

当您需要将本地文件或数据流导入到云数据库 SelectDB 版实例时,您可以使用Stream Load进行数据同步导入,并通过即时的返回结果判断本次导入是否成功。本文介绍如何通过Stream Load导入数据至云数据库 SelectDB 版实例中。

背景信息

Stream Load属于同步接口的导入方式,您可以通过发送HTTP请求将本地文件或数据流导入到云数据库 SelectDB 版实例中。Stream Load执行并返回导入结果,您可以通过请求的返回结果判断本次导入是否成功。

Stream Load适用于导入本地文件或通过程序导入数据流中的数据,支持的数据格式包括:CSV(文本)、JSONPARQUETORC

创建导入

Stream Load通过HTTP协议提交和传输数据。以下通过curl命令提交导入,也可以通过其他HTTP Client进行操作。

语法

# Header中支持的属性,请参见下面的参数说明。
# 格式为: -H "key1:value1"。
curl --location-trusted -u <username>:<password> [-H ""] -H "expect:100-continue" -T <file_name> -XPUT http://<host>:<port>/api/<db
_name>/<table_name>/_stream_load

参数说明

参数名称

参数说明

--location-trusted

需要认证时,会将usernamepassword传递给被重定向到的服务器。

-u

指定云数据库 SelectDB 版实例的用户名和密码。

-H

指定本次Stream Load导入请求的请求头(Header)内容。

-T

指定需要导入数据的文件。

-XPUT

HTTP请求的Method,采用PUT请求方法,指定云数据库 SelectDB 版的数据导入地址,其中包括参数如下。

  • host云数据库 SelectDB 版实例的VPC地址或公网地址。

    说明

    申请公网的具体操作,请参见申请和释放公网地址

  • port云数据库 SelectDB 版实例的HTTP端口号,默认为8080。

    说明

    您可以在云数据库 SelectDB 版的实例详情页面查看云数据库 SelectDB 版实例的连接地址和端口号。

  • db_name:数据库名。

  • table_name:数据表名。

Stream Load使用HTTP协议,因此导入任务有关的参数主要设置在请求头(Header)中。常用的导入参数如下。

参数名称

参数说明

label

导入任务的唯一标识。Label是在导入命令中自定义的名称。通过这个Label,可以查看对应导入任务的执行情况。Label也可用于防止重复导入相同的数据,当Label对应的导入作业状态为CANCELLED时,该Label可以再次被使用。

说明

推荐同一批次数据使用相同的Label。这样同一批次数据的重复请求只会被接受一次,保证了At-Most-Once

format

指定导入数据格式,默认值为CSV。支持CSVJSONPARQUETORCcsv_with_names(CSV文件行首过滤)和csv_with_names_and_typesCSV文件前两行过滤)。

line_delimiter

指定导入文件中的换行符,默认为\n。可以使用做多个字符的组合作为换行符。

column_separator

指定导入文件中的列分隔符,默认为\t。可以使用多个字符的组合作为列分隔符。如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。例如:Hive文件的分隔符\x01,需要指定为-H"column_separator:\x01"

compress_type

指定文件的压缩格式。仅支持CSV文件的压缩,支持gzlzobz2lz4lzopdeflate压缩格式。

max_filter_ratio

指定导入任务的最大容忍率,默认为0,即零容忍,取值范围是0~1。当导入的错误率超过该值,则导入失败。如果希望忽略错误的行,可以通过设置这个参数大于0,来保证导入成功。

strict_mode

指定是否开启严格过滤模式,默认为false。开启后,会对导入过程中的列类型转换进行严格过滤,错误的数据将被过滤。

cloud_cluster

指定导入使用的集群。默认为该实例的默认集群。如果该实例没有设置默认集群,则自动选择一个有权限的集群。

load_to_single_tablet

指定是否只导入数据到对应分区的一个tablet,默认值为false。该参数只允许在对带有random分区的Duplicate表导入数据的时候设置。

where

指定导入任务的过滤条件。支持对原始数据指定where语句进行过滤,被过滤的数据将不会被导入,也不会参与filter ratio的计算,但会被计入num_rows_unselected

partitions

指定待导入数据的分区(Partition)信息。如果待导入数据不属于指定的分区(Partition)则不会被导入。这些数据将计入dpp.abnorm.ALL。

columns

指定待导入数据的函数变换配置。支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致。

merge_type

指定数据合并类型,默认为APPEND。默认值表示本次导入是普通的追加写操作。MERGEDELETE类型仅适用于Unique Key表模型。其中MERGE类型需要配合delete参数使用,以标注Delete Flag列。而DELETE类型则表示本次导入的所有数据皆为删除数据。

delete

仅在指定merge_type类型为MERGE时才具有意义,表示数据的删除条件。

function_column.sequence_col

只适用于UNIQUE_KEYS,相同Key列下,保证Value列按照source_sequence列进行REPLACE, source_sequence可以是数据源中的列,也可以是表结构中的一列。

exec_mem_limit

指定导入内存限制。单位为字节,默认为2147483648,即2 GiB

timeout

指定导入的超时时间,单位:秒,默认值为600。范围为1~259200秒。

timezone

指定本次导入所使用的时区,默认为"Asia/Shanghai",即东八区。该参数会影响所有导入涉及的和时区有关的函数结果。

two_phase_commit

指定是否开启两阶段事务提交模式,默认为false。开启两阶段事务提交模式后,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为PRECOMMITTED,用户手动触发commit操作之后,数据才可见。

Stream load是一种同步的导入方式,因此导入的结果会通过创建导入的返回值直接返回给用户。返回结果示例如下。

{
    "TxnId": 17,
    "Label": "707717c0-271a-44c5-be0b-4e71bfeacaa5",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 5,
    "NumberLoadedRows": 5,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 28,
    "LoadTimeMs": 27,
    "BeginTxnTimeMs": 0,
    "StreamLoadPutTimeMs": 2,
    "ReadDataTimeMs": 0,
    "WriteDataTimeMs": 3,
    "CommitAndPublishTimeMs": 18
}

返回结果参数说明如下。

参数名称

参数说明

TxnId

导入的事务ID。

Label

导入Label,由用户指定或系统自动生成。

Status

导入状态,取值如下:

  • Success:导入成功。

  • Publish Timeout:导入已经完成,只是数据可能会延迟可见,无需重试。

  • Label Already ExistsLabel重复,需更换Label

  • Fail:导入失败。

ExistingJobStatus

已存在的Label对应的导入作业的状态。

这个字段只有在当Status为"Label Already Exists"时才会显示。用户可以通过这个状态,知晓已存在Label对应的导入作业的状态。"RUNNING"表示作业还在执行,"FINISHED"表示作业成功。

Message

错误信息提示。

NumberTotalRows

导入总处理的行数。

NumberLoadedRows

成功导入的行数。

NumberFilteredRows

数据质量不合格的行数。

NumberUnselectedRows

where条件过滤的行数。

LoadBytes

导入的字节数。

LoadTimeMs

导入完成时间,单位毫秒。

BeginTxnTimeMs

向Fe请求开始一个事务所花费的时间,单位:毫秒。

StreamLoadPutTimeMs

向Fe请求获取导入数据执行计划所花费的时间,单位:毫秒。

ReadDataTimeMs

读取数据所花费的时间,单位:毫秒。

WriteDataTimeMs

执行写入数据操作所花费的时间,单位:毫秒。

CommitAndPublishTimeMs

向Fe请求提交并且发布事务所花费的时间,单位:毫秒。

ErrorURL

如果有数据质量问题,通过访问这个URL查看具体错误行。

使用Stream Load导入数据,示例如下。

curl --location-trusted -u admin:admin_123 -T data.csv -H "label:123" -H "expect:100-continue" http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load

取消导入

无法手动取消Stream Load,Stream Load在超时或者导入错误后会被系统自动取消。

查看Stream Load

您可以通过show stream load来查看已经完成的Stream load任务。默认BE(BackEnd)不保留Stream Load的启用记录,如果您要查看则需要在BE上启用记录,配置参数为:enable_stream_load_record=true,具体操作请参见BE配置项。​

使用示例

脚本示例

  1. 创建待导入的SelectDB数据表,示例如下。

    CREATE TABLE test_table
    (
        id int,
        name varchar(50),
        age int,
        address varchar(50),
        url varchar(500)
    )
    UNIQUE KEY(`id`, `name`)
    DISTRIBUTED BY HASH(id) BUCKETS 16
    PROPERTIES("replication_num" = "1");
  2. 创建待导入文件test.data,示例如下。

    1,yang,32,shanghai,http://example.com
    2,wang,22,beijing,http://example.com
    3,xiao,23,shenzhen,http://example.com
    4,jess,45,hangzhou,http://example.com
    5,jack,14,shanghai,http://example.com
    6,tomy,25,hangzhou,http://example.com
    7,lucy,45,shanghai,http://example.com
    8,tengyin,26,shanghai,http://example.com
    9,wangli,27,shenzhen,http://example.com
    10,xiaohua,37,shanghai,http://example.com
  3. 使用不同参数配置导入数据,示例如下。

    • 将本地文件test.data中的数据导入到数据库test_db中的test_table表,使用Label用于去重,指定超时时间为100秒。

       curl --location-trusted -u root -H "label:123" -H "timeout:100" -H "expect:100-continue" -H "column_separator:," -T test.data http://host:port/api/test_db/test_table/_stream_load
    • 将本地文件test.data中的数据导入到数据库test_db中的test_table表,使用Label用于去重,指定文件的列名,并且只导入address等于hangzhou的数据。

      curl --location-trusted -u root -H "label:123" -H "columns: id,name,age,address,url" -H "where: address='hangzhou'" -H "expect:100-continue" -H "column_separator:," -T test.data http://host:port/api/test_db/test_table/_stream_load
    • 将本地文件test.data中的数据导入到数据库test_db中的test_table表,允许20%的错误率。

      curl --location-trusted -u root -H "label:123" -H "max_filter_ratio:0.2" -H "expect:100-continue" -T test.data http://host:port/api/test_db/test_table/_stream_load
    • 导入数据进行严格模式过滤,并设置时区为Africa/Abidjan

      curl --location-trusted -u root -H "strict_mode: true" -H "timezone: Africa/Abidjan" -H "expect:100-continue" -T test.data http://host:port/api/test_db/test_table/_stream_load
    • 删除与这批导入Key相同的数据。

      curl --location-trusted -u root -H "merge_type: DELETE" -H "expect:100-continue" -T test.data http://host:port/api/test_db/test_table/_stream_load
    • 将这批数据中address列为hangzhou的数据的行删除,其他行正常追加。

      curl --location-trusted -u root: -H "expect:100-continue" -H "columns: id,name,age,address,url" -H "merge_type: MERGE" -H "delete: address='hangzhou'" -H "column_separator:," -T test.data http://host:port/api/testDb/testTbl/_stream_load

Java代码示例

package com.selectdb.x2doris.connector.doris.writer;

import com.alibaba.fastjson2.JSON;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.RequestContent;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Map;

public class DorisLoadCase {
    public static void main(String[] args) throws Exception {

        // 1. 参数配置
        String loadUrl = "http://<Host:Port>/api/<DB>/<TABLE>/_stream_load?";
        String userName = "admin";
        String password = "****";

        // 2. 构建httpclient,特别注意需要开启重定向(isRedirectable)
        HttpClientBuilder httpClientBuilder = HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {
            // 开启重定向
            @Override
            protected boolean isRedirectable(String method) {
                return true;
            }
        });
        httpClientBuilder.addInterceptorLast(new RequestContent(true));
        HttpClient httpClient = httpClientBuilder.build();

        // 3. 构建httpPut请求对象
        HttpPut httpPut = new HttpPut(loadUrl);

        // 设置httpHeader...
        String basicAuth = Base64.getEncoder().encodeToString(String.format("%s:%s", userName, password).getBytes(StandardCharsets.UTF_8));
        httpPut.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + basicAuth);
        httpPut.addHeader(HttpHeaders.EXPECT, "100-continue");
        httpPut.addHeader(HttpHeaders.CONTENT_TYPE, "text/plain; charset=UTF-8");

        RequestConfig reqConfig = RequestConfig.custom().setConnectTimeout(30000).build();
        httpPut.setConfig(reqConfig);

        // 4. 设置要发送的数据,这里写入csv
        // 假设有一张表,字段如下:
        // field1,field2,field3,field4
        // 这里模拟了三条csv记录,doris 中csv的行分隔符默认是\n,列分隔符默认是\t
        String data =
                "1\t2\t3\t4\n" +
                "11\t22\t33\t44\n" +
                "111\t222\t333\t444";

        httpPut.setEntity(new StringEntity(data));

        // 5. 发送请求,处理结果
        HttpResponse httpResponse = httpClient.execute(httpPut);
        int httpStatus = httpResponse.getStatusLine().getStatusCode();
        String respContent = EntityUtils.toString(new BufferedHttpEntity(httpResponse.getEntity()), StandardCharsets.UTF_8);
        String respMsg = httpResponse.getStatusLine().getReasonPhrase();

        if (httpStatus == HttpStatus.SC_OK) {
            // 选择适合的JSON序列化组件,对返回结果进行序列化
            Map<String, String> respAsMap = JSON.parseObject(respContent, Map.class);
            // 获取SelectDB返回的状态码...
            String dorisStatus = respAsMap.get("Status");
            // SelectDB返回以下状态,都表示数据写入成功
            List<String> DORIS_SUCCESS_STATUS = Arrays.asList("Success", "Publish Timeout", "200");
            if (!DORIS_SUCCESS_STATUS.contains(dorisStatus) || !respMsg.equals("OK")) {
                throw new RuntimeException("StreamLoad failed, status: " + dorisStatus + ", Response: " + respMsg);
            } else {
                System.out.println("successful....");
            }
        } else {
            throw new IOException("StreamLoad Response HTTP Status Error, httpStatus: "+ httpStatus +",  url: " + loadUrl + ", error: " + respMsg);
        }
    }
}

相关系统配置

FE配置

stream_load_default_timeout_second:导入任务的超时时间,单位:秒。默认值为600。导入任务在设定的timeout时间内未完成则会被系统取消,变成CANCELLED。如果导入的源文件无法在规定时间内完成导入,您可以在Stream load请求中设置单独的超时时间或者调整FE的参数stream_load_default_timeout_second来设置全局的默认超时时间。

BE配置

streaming_load_max_mb:Stream load的最大导入大小,单位:MB,默认值为10240。如果您的原始文件超过该值,则需要调整BE参数streaming_load_max_mb

Http Stream模式

在Stream Load中,依托Table Value Function(TVF)功能,可以通过使用SQL表达式来表达导入的参数。这个Stream Load依托TVF功能后名为http_stream。更多Table Value Function(TVF)的使用方式,详情请参见TVF

使用http_stream进行Stream Load导入时的Rest API URL不同于Stream Load普通导入的 URL。

  • 普通Stream Load的URL为:http://host:http_port/api/{db}/{table}/_stream_load

  • 使用TVF http_stream的URL 为:http://host:http_port/api/_http_stream

语法

Stream Load的Http Stream模式。

curl --location-trusted -u <username>:<password> [-H "sql: ${load_sql}"...] -T <file_name> -XPUT http://host:http_port/api/_http_stream

Http Stream参数说明请参见参数说明

使用示例

在Http Header中添加一个SQL的参数load_sql,去替代之前参数中的column_separatorline_delimiterwherecolumns等参数,SQL参数load_sql示例如下。

INSERT INTO db.table (col, ...) SELECT stream_col, ... FROM http_stream("property1"="value1");

完整示例:

curl  --location-trusted -u admin:admin_123 -T test.csv  -H "sql:insert into demo.example_tbl_1(user_id, age, cost) select c1, c4, c7 * 2 from http_stream(\"format\" = \"CSV\", \"column_separator\" = \",\" ) where age >= 30"  http://host:http_port/api/_http_stream