当您需要将本地文件或数据流导入到云数据库 SelectDB 版实例时,您可以使用Stream Load进行数据同步导入,并通过即时的返回结果判断本次导入是否成功。本文介绍如何通过Stream Load导入数据至云数据库 SelectDB 版实例中。
背景信息
Stream Load属于同步接口的导入方式,您可以通过发送HTTP请求将本地文件或数据流导入到云数据库 SelectDB 版实例中。Stream Load执行并返回导入结果,您可以通过请求的返回结果判断本次导入是否成功。
Stream Load适用于导入本地文件或通过程序导入数据流中的数据,支持的数据格式包括:CSV
(文本)、JSON
、PARQUET
和ORC
。
我们强烈建议您使用Stream Load作为主要的数据导入方式。
注意事项
发起Stream Load请求的终端与SelectDB网络互通:
为云数据库 SelectDB 版实例申请公网地址。具体操作,请参见申请和释放公网地址。
如果您发起Stream Load请求的终端与云数据库 SelectDB 版实例位于同一VPC下,跳过此步骤。
将发起Stream Load请求终端的相关IP添加至云数据库 SelectDB 版的白名单。具体操作,请参见设置白名单。
若发起Stream Load请求终端存在白名单机制,已将SelectDB实例所在网段IP添加至源集群的白名单中。如何获取SelectDB实例的网段IP,请参见如何查看云数据库 SelectDB 版实例所属VPC的IP网段?。
创建导入
Stream Load通过HTTP协议提交和传输数据。以下通过curl命令提交导入,该命令可在Linux/macOS系统的终端,或者Windows系统下的命令提示符下执行。Stream Load也可以通过其他HTTP Client进行操作。
语法
# Header中支持的属性,请参见下面的参数说明。
# 格式为: -H "key1:value1"。
curl --location-trusted -u <username>:<password> [-H ""] -H "expect:100-continue" -T <file_name> -XPUT http://<host>:<port>/api/<db
_name>/<table_name>/_stream_load
参数说明
参数名称 | 参数说明 |
| 需要认证时,会将 |
| 指定云数据库 SelectDB 版实例的用户名和密码。 |
| 指定本次Stream Load导入请求的请求头(Header)内容。 |
| 指定需要导入数据的文件。 |
| HTTP请求的Method,采用PUT请求方法,指定云数据库 SelectDB 版的数据导入地址,其中包括参数如下。
|
Stream Load使用HTTP协议,因此导入任务有关的参数主要设置在请求头(Header)中。常用的导入参数如下。
参数名称 | 参数说明 |
| 导入任务的唯一标识。 说明 推荐同一批次数据使用相同的 |
| 指定导入数据格式,默认值为 |
| 指定导入文件中的换行符,默认为 |
| 指定导入文件中的列分隔符,默认为 |
| 指定文件的压缩格式。仅支持 |
| 指定导入任务的最大容忍率,默认为 |
| 指定是否开启严格过滤模式,默认为 |
| 指定导入使用的集群。默认为该实例的默认集群。如果该实例没有设置默认集群,则自动选择一个有权限的集群。 |
| 指定是否只导入数据到对应分区的一个tablet,默认值为 |
| 指定导入任务的过滤条件。支持对原始数据指定 |
| 指定待导入数据的分区(Partition)信息。如果待导入数据不属于指定的分区(Partition)则不会被导入。这些数据将计入dpp.abnorm.ALL。 |
| 指定待导入数据的函数变换配置。支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致。 |
| 指定数据合并类型,默认为 |
| 仅在指定 |
| 只适用于UNIQUE_KEYS,相同Key列下,保证Value列按照source_sequence列进行REPLACE, source_sequence可以是数据源中的列,也可以是表结构中的一列。 |
| 指定导入内存限制。单位为字节,默认为 |
| 指定导入的超时时间,单位:秒,默认值为 |
| 指定本次导入所使用的时区,默认为" |
| 指定是否开启两阶段事务提交模式,默认为 |
Stream load是一种同步的导入方式,因此导入的结果会通过创建导入的返回值直接返回给用户。返回结果示例如下。
{
"TxnId": 17,
"Label": "707717c0-271a-44c5-be0b-4e71bfeacaa5",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 5,
"NumberLoadedRows": 5,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 28,
"LoadTimeMs": 27,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 2,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 3,
"CommitAndPublishTimeMs": 18
}
返回结果参数说明如下。
参数名称 | 参数说明 |
| 导入的事务ID。 |
| 导入 |
| 导入状态,取值如下:
|
| 已存在的 这个字段只有在当Status为" |
| 错误信息提示。 |
| 导入总处理的行数。 |
| 成功导入的行数。 |
| 数据质量不合格的行数。 |
| 被 |
| 导入的字节数。 |
| 导入完成时间,单位毫秒。 |
| 向Fe请求开始一个事务所花费的时间,单位:毫秒。 |
| 向Fe请求获取导入数据执行计划所花费的时间,单位:毫秒。 |
| 读取数据所花费的时间,单位:毫秒。 |
| 执行写入数据操作所花费的时间,单位:毫秒。 |
| 向Fe请求提交并且发布事务所花费的时间,单位:毫秒。 |
| 如果有数据质量问题,通过访问这个URL查看具体错误行。 |
使用Stream Load导入数据,示例如下。
curl --location-trusted -u admin:admin_123 -T data.csv -H "label:123" -H "expect:100-continue" http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
取消导入
无法手动取消Stream Load,Stream Load在超时或者导入错误后会被系统自动取消。
查看Stream Load
您可以通过MySQL客户端连接到云数据库 SelectDB 版实例,通过执行show stream load
语句来查看已经完成的Stream Load任务。默认BE(BackEnd)不保留Stream Load的启用记录,如果您要查看则需要在BE上启用记录,配置参数为:enable_stream_load_record=true,具体操作请参见BE配置项。
导入数据的完整示例
准备工作
开始导入操作前,您需确保您的终端环境与SelectDB实例网络处于互通状态,具体操作,请参见注意事项。
操作步骤
通过脚本导入示例
创建待导入数据的表。
连接SelectDB实例。具体操作,请参见通过MySQL客户端连接云数据库SelectDB版实例。
执行建库语句。
CREATE DATABASE test_db;
执行建表语句。
CREATE TABLE test_table ( id int, name varchar(50), age int, address varchar(50), url varchar(500) ) UNIQUE KEY(`id`, `name`) DISTRIBUTED BY HASH(id) BUCKETS 16 PROPERTIES("replication_num" = "1");
在发起Stream Load的终端,创建待导入文件
test.data
。1,yang,32,shanghai,http://example.com 2,wang,22,beijing,http://example.com 3,xiao,23,shenzhen,http://example.com 4,jess,45,hangzhou,http://example.com 5,jack,14,shanghai,http://example.com 6,tomy,25,hangzhou,http://example.com 7,lucy,45,shanghai,http://example.com 8,tengyin,26,shanghai,http://example.com 9,wangli,27,shenzhen,http://example.com 10,xiaohua,37,shanghai,http://example.com
导入数据。
打开本地环境的终端,通过curl命令发起Stream Load任务,导入数据。
创建导入任务的具体语法以及参数说明,请参见创建导入,以下为常见导入场景的示例。
将本地文件
test.data
中的数据导入到数据库test_db
中的test_table
表,使用Label
用于去重,指定超时时间为100秒。curl --location-trusted -u root -H "label:123" -H "timeout:100" -H "expect:100-continue" -H "column_separator:," -T test.data http://host:port/api/test_db/test_table/_stream_load
将本地文件
test.data
中的数据导入到数据库test_db
中的test_table
表,使用Label用于去重,指定文件的列名,并且只导入address等于hangzhou的数据。curl --location-trusted -u root -H "label:123" -H "columns: id,name,age,address,url" -H "where: address='hangzhou'" -H "expect:100-continue" -H "column_separator:," -T test.data http://host:port/api/test_db/test_table/_stream_load
将本地文件
test.data
中的数据导入到数据库test_db
中的test_table
表,允许20%的错误率。curl --location-trusted -u root -H "label:123" -H "max_filter_ratio:0.2" -H "expect:100-continue" -T test.data http://host:port/api/test_db/test_table/_stream_load
导入数据进行严格模式过滤,并设置时区为
Africa/Abidjan
。curl --location-trusted -u root -H "strict_mode: true" -H "timezone: Africa/Abidjan" -H "expect:100-continue" -T test.data http://host:port/api/test_db/test_table/_stream_load
删除与这批导入Key相同的数据。
curl --location-trusted -u root -H "merge_type: DELETE" -H "expect:100-continue" -T test.data http://host:port/api/test_db/test_table/_stream_load
将这批数据中address列为hangzhou的数据的行删除,其他行正常追加。
curl --location-trusted -u root: -H "expect:100-continue" -H "columns: id,name,age,address,url" -H "merge_type: MERGE" -H "delete: address='hangzhou'" -H "column_separator:," -T test.data http://host:port/api/testDb/testTbl/_stream_load
通过Java代码导入示例
package com.selectdb.x2doris.connector.doris.writer;
import com.alibaba.fastjson2.JSON;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.RequestContent;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Map;
public class DorisLoadCase {
public static void main(String[] args) throws Exception {
// 1. 参数配置
String loadUrl = "http://<Host:Port>/api/<DB>/<TABLE>/_stream_load?";
String userName = "admin";
String password = "****";
// 2. 构建httpclient,特别注意需要开启重定向(isRedirectable)
HttpClientBuilder httpClientBuilder = HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {
// 开启重定向
@Override
protected boolean isRedirectable(String method) {
return true;
}
});
httpClientBuilder.addInterceptorLast(new RequestContent(true));
HttpClient httpClient = httpClientBuilder.build();
// 3. 构建httpPut请求对象
HttpPut httpPut = new HttpPut(loadUrl);
// 设置httpHeader...
String basicAuth = Base64.getEncoder().encodeToString(String.format("%s:%s", userName, password).getBytes(StandardCharsets.UTF_8));
httpPut.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + basicAuth);
httpPut.addHeader(HttpHeaders.EXPECT, "100-continue");
httpPut.addHeader(HttpHeaders.CONTENT_TYPE, "text/plain; charset=UTF-8");
RequestConfig reqConfig = RequestConfig.custom().setConnectTimeout(30000).build();
httpPut.setConfig(reqConfig);
// 4. 设置要发送的数据,这里写入csv
// 假设有一张表,字段如下:
// field1,field2,field3,field4
// 这里模拟了三条csv记录,doris 中csv的行分隔符默认是\n,列分隔符默认是\t
String data =
"1\t2\t3\t4\n" +
"11\t22\t33\t44\n" +
"111\t222\t333\t444";
httpPut.setEntity(new StringEntity(data));
// 5. 发送请求,处理结果
HttpResponse httpResponse = httpClient.execute(httpPut);
int httpStatus = httpResponse.getStatusLine().getStatusCode();
String respContent = EntityUtils.toString(new BufferedHttpEntity(httpResponse.getEntity()), StandardCharsets.UTF_8);
String respMsg = httpResponse.getStatusLine().getReasonPhrase();
if (httpStatus == HttpStatus.SC_OK) {
// 选择适合的JSON序列化组件,对返回结果进行序列化
Map<String, String> respAsMap = JSON.parseObject(respContent, Map.class);
// 获取SelectDB返回的状态码...
String dorisStatus = respAsMap.get("Status");
// SelectDB返回以下状态,都表示数据写入成功
List<String> DORIS_SUCCESS_STATUS = Arrays.asList("Success", "Publish Timeout", "200");
if (!DORIS_SUCCESS_STATUS.contains(dorisStatus) || !respMsg.equals("OK")) {
throw new RuntimeException("StreamLoad failed, status: " + dorisStatus + ", Response: " + respMsg);
} else {
System.out.println("successful....");
}
} else {
throw new IOException("StreamLoad Response HTTP Status Error, httpStatus: "+ httpStatus +", url: " + loadUrl + ", error: " + respMsg);
}
}
}
相关系统配置
FE配置
stream_load_default_timeout_second
:导入任务的超时时间,单位:秒。默认值为600。导入任务在设定的timeout时间内未完成则会被系统取消,变成CANCELLED。如果导入的源文件无法在规定时间内完成导入,您可以在Stream load请求中设置单独的超时时间或者调整FE的参数stream_load_default_timeout_second
来设置全局的默认超时时间。
BE配置
streaming_load_max_mb
:Stream load的最大导入大小,单位:MB,默认值为10240。如果您的原始文件超过该值,则需要调整BE参数streaming_load_max_mb
。
Http Stream模式
在Stream Load中,依托Table Value Function(TVF)功能,可以通过使用SQL表达式来表达导入的参数。这个Stream Load依托TVF功能后名为http_stream
。更多Table Value Function(TVF)的使用方式,详情请参见TVF。
使用http_stream进行Stream Load导入时的Rest API URL不同于Stream Load普通导入的 URL。
普通Stream Load的URL为:
http://host:http_port/api/{db}/{table}/_stream_load
。使用TVF http_stream的URL 为:
http://host:http_port/api/_http_stream
。
语法
Stream Load的Http Stream模式。
curl --location-trusted -u <username>:<password> [-H "sql: ${load_sql}"...] -T <file_name> -XPUT http://host:http_port/api/_http_stream
Http Stream参数说明请参见参数说明。
使用示例
在Http Header中添加一个SQL
的参数load_sql,去替代之前参数中的column_separator
、line_delimiter
、where
、columns
等参数,SQL
参数load_sql示例如下。
INSERT INTO db.table (col, ...) SELECT stream_col, ... FROM http_stream("property1"="value1");
完整示例:
curl --location-trusted -u admin:admin_123 -T test.csv -H "sql:insert into demo.example_tbl_1(user_id, age, cost) select c1, c4, c7 * 2 from http_stream(\"format\" = \"CSV\", \"column_separator\" = \",\" ) where age >= 30" http://host:http_port/api/_http_stream