云数据库 SelectDB 版集成BitSail,支持使用SelectDB Sink导入表数据至云数据库 SelectDB 版。本文将为您介绍使用SelectDB Sink同步数据至云数据库 SelectDB 版的使用方式。
概述
BitSail是一款基于分布式架构的高性能数据集成引擎,支持多种异构数据源间的数据同步,并提供离线、实时、全量、增量场景下的全域数据集成解决方案。您可以通过BitSail引擎读取MySQL、Hive、Kafka等数据源中的海量数据,然后由SelectDB Sink将数据写入到云数据库 SelectDB 版。
前提条件
BitSail 0.1.0版本及以上。
使用方式
BitSail的job.writer
中配置SelectDB连接器参数,示例如下。
{
"job": {
"writer": {
"class": "com.bytedance.bitsail.connector.selectdb.sink.SelectdbSink",
"load_url": "<selectdb_http_address>",
"jdbc_url": "<selectdb_mysql_address>",
"cluster_name": "<selectdb_cluster_name>",
"user": "<username>",
"password": "<password>",
"table_identifier": "<selectdb_table_identifier>",
"columns": [
{
"index": 0,
"name": "id",
"type": "int"
},
{
"index": 1,
"name": "bigint_type",
"type": "bigint"
},
{
"index": 2,
"name": "string_type",
"type": "varchar"
},
{
"index": 3,
"name": "double_type",
"type": "double"
},
{
"index": 4,
"name": "date_type",
"type": "date"
}
]
}
}
}
参数说明如下。
参数名称 | 是否必填 | 参数含义 |
class | 是 | 云数据库 SelectDB 版写连接器类型,默认为: |
load_url | 是 | 云数据库 SelectDB 版实例的访问地址和HTTP协议端口。 您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和HTTP协议端口。 示例: |
jdbc_url | 是 | 云数据库 SelectDB 版实例的访问地址和MySQL协议端口。 您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和MySQL协议端口。 示例: |
cluster_name | 是 | 云数据库 SelectDB 版实例的集群名。实例中可能包含多个集群,可按需选择。 |
user | 是 | 云数据库 SelectDB 版实例的用户名。 |
password | 是 | 云数据库 SelectDB 版实例对应用户名的密码。 |
table_identifier | 是 | 云数据库 SelectDB 版实例的表名,格式为 |
writer_parallelism_num | 否 | 指定SelectDB写并发数量。 |
sink_flush_interval_ms | 否 | Upsert模式下的flush间隔,单位毫秒;默认为:5000。 |
sink_max_retries | 否 | 写入的最大重试次数,默认为:3。 |
sink_buffer_size | 否 | 写入buffer最大值,单位字节;默认为:1048576 (1 MB)。 |
sink_buffer_count | 否 | 初始化buffer的数量,默认为:3。 |
sink_enable_delete | 否 | 是否支持Delete事件同步。 |
sink_write_mode | 否 | 写入模式,目前仅支持BATCH_UPSERT。 |
stream_load_properties | 否 | 追加在Stream Load URL后的参数,以Map<String,String>格式组成。 |
load_contend_type | 否 | copy-into使用的格式。取值范围为CSV或JSON。默认JSON。 |
csv_field_delimiter | 否 | CSV格式的行内分隔符,默认为:逗号","。 |
csv_line_delimiter | 否 | CSV格式的行间分隔符,默认为:"\n"。 |
使用示例
下文使用BItSail构造数据源为例,为您介绍如何通过BitSail将上游数据导入至云数据库 SelectDB 版。
环境准备
配置BitSail环境,下载并解压BitSail安装包。
wget feilun-justtmp.oss-cn-hongkong.aliyuncs.com/bitsail.tar.gz tar -zxvf bitsail.tar.gz
配置云数据库 SelectDB 版实例。
创建云数据库 SelectDB 版实例,详情请参见创建实例。
通过MySQL协议连接云数据库 SelectDB 版实例,详情请参见连接实例。
创建测试数据库和测试表。
创建测试数据库。
CREATE DATABASE test_db;
创建测试表。
CREATE TABLE `test_table` ( `id` BIGINT(20) NULL, `bigint_type` BIGINT(20) NULL, `string_type` VARCHAR(100) NULL, `double_type` DOUBLE NULL, `decimal_type` DECIMALV3(27, 9) NULL, `date_type` DATEV2 NULL, `partition_date` DATEV2 NULL ) ENGINE=OLAP DUPLICATE KEY(`id`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`id`) BUCKETS 10 PROPERTIES ( "light_schema_change" = "true" );
开通云数据库 SelectDB 版公网地址,详情请参见申请和释放公网地址。
将BitSail环境的公网IP添加到IP白名单中,详情请参见设置白名单。
通过BitSail本地任务同步数据到SelectDB
创建配置文件
test.json
,配置任务信息。通过bitsail包中的FakeSource类构造本地数据进行导入。{ "job": { "common": { "job_id": -2413, "job_name": "bitsail_fake_to_selectdb_test", "instance_id": -20413, "user_name": "user" }, "reader": { "class": "com.bytedance.bitsail.connector.legacy.fake.source.FakeSource", "total_count": 300, "rate": 10000, "random_null_rate": 0, "unique_fields": "id", "columns_with_fixed_value": [ { "name": "partition_date", "fixed_value": "2022-10-10" } ], "columns": [ { "index": 0, "name": "id", "type": "long" }, { "index": 1, "name": "bigint_type", "type": "long" }, { "index": 2, "name": "string_type", "type": "string" }, { "index": 3, "name": "double_type", "type": "double" }, { "index": 4, "name": "decimal_type", "type": "double" }, { "index": 5, "name": "date_type", "type": "date.date" }, { "index": 6, "name": "partition_date", "type": "string" } ] }, "writer": { "class": "com.bytedance.bitsail.connector.selectdb.sink.SelectdbSink", "load_url": "selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080", "jdbc_url": "selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030", "cluster_name": "new_cluster", "user": "admin", "password": "****", "table_identifier": "test_db.test_table", "columns": [ { "index": 0, "name": "id", "type": "bigint" }, { "index": 1, "name": "bigint_type", "type": "bigint" }, { "index": 2, "name": "string_type", "type": "varchar" }, { "index": 3, "name": "double_type", "type": "double" }, { "index": 4, "name": "decimal_type", "type": "double" }, { "index": 5, "name": "date_type", "type": "date" }, { "index": 6, "name": "partition_date", "type": "date" } ] } } }
命令行提交任务。
bash bin/bitsail run --engine flink --execution-mode run --deployment-mode local --conf test.json