通过BitSail导入数据

云数据库 SelectDB 版集成BitSail,支持使用SelectDB Sink导入表数据至云数据库 SelectDB 版。本文将为您介绍使用SelectDB Sink同步数据至云数据库 SelectDB 版的使用方式。

概述

BitSail是一款基于分布式架构的高性能数据集成引擎,支持多种异构数据源间的数据同步,并提供离线、实时、全量、增量场景下的全域数据集成解决方案。您可以通过BitSail引擎读取MySQL、Hive、Kafka等数据源中的海量数据,然后由SelectDB Sink将数据写入到云数据库 SelectDB 版

前提条件

BitSail 0.1.0版本及以上。

使用方式

BitSail的job.writer中配置SelectDB连接器参数,示例如下。

{ 
  "job": { 
    "writer": { 
      "class": "com.bytedance.bitsail.connector.selectdb.sink.SelectdbSink", 
      "load_url": "<selectdb_http_address>", 
      "jdbc_url": "<selectdb_mysql_address>", 
      "cluster_name": "<selectdb_cluster_name>", 
      "user": "<username>", 
      "password": "<password>", 
      "table_identifier": "<selectdb_table_identifier>", 
      "columns": [ 
        { 
          "index": 0, 
          "name": "id", 
          "type": "int" 
        }, 
        { 
          "index": 1, 
          "name": "bigint_type", 
          "type": "bigint" 
        }, 
        { 
          "index": 2, 
          "name": "string_type", 
          "type": "varchar" 
        }, 
        { 
          "index": 3, 
          "name": "double_type", 
          "type": "double" 
        }, 
        { 
          "index": 4, 
          "name": "date_type", 
          "type": "date" 
        } 
      ] 
    } 
  }
}

参数说明如下。

参数名称

是否必填

参数含义

class

云数据库 SelectDB 版写连接器类型,默认为:com.bytedance.bitsail.connector.selectdb.sink.SelectdbSink

load_url

云数据库 SelectDB 版实例的访问地址和HTTP协议端口。

您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和HTTP协议端口

示例:selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080

jdbc_url

云数据库 SelectDB 版实例的访问地址和MySQL协议端口。

您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和MySQL协议端口

示例:selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030

cluster_name

云数据库 SelectDB 版实例的集群名。实例中可能包含多个集群,可按需选择。

user

云数据库 SelectDB 版实例的用户名。

password

云数据库 SelectDB 版实例对应用户名的密码。

table_identifier

云数据库 SelectDB 版实例的表名,格式为库名.表名。示例:test_db.test_table

writer_parallelism_num

指定SelectDB写并发数量。

sink_flush_interval_ms

Upsert模式下的flush间隔,单位毫秒;默认为:5000。

sink_max_retries

写入的最大重试次数,默认为:3。

sink_buffer_size

写入buffer最大值,单位字节;默认为:1048576 (1 MB)。

sink_buffer_count

初始化buffer的数量,默认为:3。

sink_enable_delete

是否支持Delete事件同步。

sink_write_mode

写入模式,目前仅支持BATCH_UPSERT。

stream_load_properties

追加在Stream Load URL后的参数,以Map<String,String>格式组成。

load_contend_type

copy-into使用的格式。取值范围为CSV或JSON。默认JSON。

csv_field_delimiter

CSV格式的行内分隔符,默认为:逗号","。

csv_line_delimiter

CSV格式的行间分隔符,默认为:"\n"。

使用示例

下文使用BItSail构造数据源为例,为您介绍如何通过BitSail将上游数据导入至云数据库 SelectDB 版

环境准备

  1. 配置BitSail环境,下载并解压BitSail安装包。

    wget feilun-justtmp.oss-cn-hongkong.aliyuncs.com/bitsail.tar.gz
    tar -zxvf bitsail.tar.gz
  2. 配置云数据库 SelectDB 版实例。

    1. 创建云数据库 SelectDB 版实例,详情请参见创建实例

    2. 通过MySQL协议连接云数据库 SelectDB 版实例,详情请参见连接实例

    3. 创建测试数据库和测试表。

      1. 创建测试数据库。

        CREATE DATABASE test_db;
      2. 创建测试表。

        CREATE TABLE `test_table` (
          `id` BIGINT(20) NULL,
          `bigint_type` BIGINT(20) NULL,
          `string_type` VARCHAR(100) NULL,
          `double_type` DOUBLE NULL,
          `decimal_type` DECIMALV3(27, 9) NULL,
          `date_type` DATEV2 NULL,
          `partition_date` DATEV2 NULL
        ) ENGINE=OLAP
        DUPLICATE KEY(`id`)
        COMMENT 'OLAP'
        DISTRIBUTED BY HASH(`id`) BUCKETS 10
        PROPERTIES (
          "light_schema_change" = "true"
        );
    4. 开通云数据库 SelectDB 版公网地址,详情请参见申请和释放公网地址

    5. 将BitSail环境的公网IP添加到IP白名单中,详情请参见设置白名单

通过BitSail本地任务同步数据到SelectDB

  1. 创建配置文件test.json,配置任务信息。通过bitsail包中的FakeSource类构造本地数据进行导入。

    {
      "job": {
        "common": {
          "job_id": -2413,
          "job_name": "bitsail_fake_to_selectdb_test",
          "instance_id": -20413,
          "user_name": "user"
        },
        "reader": {
          "class": "com.bytedance.bitsail.connector.legacy.fake.source.FakeSource",
          "total_count": 300,
          "rate": 10000,
          "random_null_rate": 0,
          "unique_fields": "id",
          "columns_with_fixed_value": [
            {
              "name": "partition_date",
              "fixed_value": "2022-10-10"
            }
          ],
          "columns": [
            {
              "index": 0,
              "name": "id",
              "type": "long"
            },
            {
              "index": 1,
              "name": "bigint_type",
              "type": "long"
            },
            {
              "index": 2,
              "name": "string_type",
              "type": "string"
            },
            {
              "index": 3,
              "name": "double_type",
              "type": "double"
            },
            {
              "index": 4,
              "name": "decimal_type",
              "type": "double"
            },
            {
              "index": 5,
              "name": "date_type",
              "type": "date.date"
            },
            {
              "index": 6,
              "name": "partition_date",
              "type": "string"
            }
          ]
        },
        "writer": {
          "class": "com.bytedance.bitsail.connector.selectdb.sink.SelectdbSink",
          "load_url": "selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080",
          "jdbc_url": "selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030",
          "cluster_name": "new_cluster",
          "user": "admin",
          "password": "****",
          "table_identifier": "test_db.test_table",
          "columns": [
            {
              "index": 0,
              "name": "id",
              "type": "bigint"
            },
            {
              "index": 1,
              "name": "bigint_type",
              "type": "bigint"
            },
            {
              "index": 2,
              "name": "string_type",
              "type": "varchar"
            },
            {
              "index": 3,
              "name": "double_type",
              "type": "double"
            },
            {
              "index": 4,
              "name": "decimal_type",
              "type": "double"
            },
            {
              "index": 5,
              "name": "date_type",
              "type": "date"
            },
            {
              "index": 6,
              "name": "partition_date",
              "type": "date"
            }
          ]
        }
      }
    }
  2. 命令行提交任务。

    bash bin/bitsail run --engine flink --execution-mode run --deployment-mode local --conf test.json