本文介绍使用DataX Doris Writer同步数据至云数据库 SelectDB 版。
概述
DataX是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。您可以通过DataX服务读取上游数据,然后由DataX Doris Writer将数据写入到云数据库 SelectDB 版。
前提条件
已安装Maven环境。
已安装Python3.6及以上版本。
使用示例
如下以MySQL数据源为例,介绍在Linux环境下如何通过DataX将MySQL数据导入至云数据库 SelectDB 版。
步骤一:配置DataX环境
下载DataX程序包代码,插件代码下载请访问Doris社区。
运行DataX程序包中的init-env.sh脚本,构建DataX开发环境。
sh init-env.sh
编译mysqlreader和doriswriter。
编译整个DataX项目。
cd DataX/ mvn package assembly:assembly -Dmaven.test.skip=true
编译产出结果在target/datax/datax/.目录下。
说明hdfsreader,hdfswriter,ossreader和osswriter这四个插件需要额外的jar包,如果不需要这些插件,可以在DataX/pom.xml中删除这些插件的模块。
单独编译mysqlreader和selectdbwriter插件。
mvn clean install -pl plugin-rdbms-util,mysqlreader,doriswriter -DskipTests
步骤二:构造需要导入的数据
创建MySQL测试表。
CREATE TABLE `employees` ( `emp_no` int NOT NULL, `birth_date` date NOT NULL, `first_name` varchar(14) NOT NULL, `last_name` varchar(16) NOT NULL, `gender` enum('M','F') NOT NULL, `hire_date` date NOT NULL, PRIMARY KEY (`emp_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
使用DMS构建测试数据,详情请参见测试数据构建。
步骤三:配置云数据库 SelectDB 版实例。
通过MySQL协议连接云数据库 SelectDB 版实例,详情请参见连接实例。
创建测试数据库和测试表。
创建测试数据库。
CREATE DATABASE test_db;
创建测试表。
USE test_db; CREATE TABLE employees ( emp_no int NOT NULL, birth_date date, first_name varchar(20), last_name varchar(20), gender char(2), hire_date date ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
步骤四:通过DataX服务同步MySQL数据到SelectDB
开通云数据库 SelectDB 版公网地址,详情请参见申请和释放公网地址。
将DataX主机的公网IP添加到IP白名单中,详情请参见设置白名单。
创建配置文件
mysqlToSelectDB.json
,配置任务信息。{ "job":{ "content":[ { "reader":{ "name": "mysqlreader", "parameter": { "column": [ "emp_no", "birth_date", "first_name", "last_name", "gender", "hire_date" ], "where": "emp_no>0", "connection": [ { "jdbcUrl": [ "jdbc:mysql://host:port/test_db?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8" ], "table": [ "employees" ] } ], "password": "123456", "splitPk": "emp_no", "username": "admin" } }, "writer":{ "name":"doriswriter", "parameter":{ "loadUrl":[ "selectdb-cn-xxx-public.selectdbfe.rds.aliyuncs.com:8080" ], "loadProps":{ "format":"json", "strip_outer_array":"true" }, "column":[ "emp_no", "birth_date", "first_name", "last_name", "gender", "hire_date" ], "username":"admin", "password":"123456", "postSql":[ ], "preSql":[ ], "connection":[ { "jdbcUrl":"jdbc:mysql://selectdb-cn-xxx-public.selectdbfe.rds.aliyuncs.com:9030/test_db", "table":[ "employees" ], "selectedDatabase":"test_db" } ], "maxBatchRows":1000000, "batchSize":536870912000 } } } ], "setting":{ "errorLimit":{ "percentage":0.02, "record":0 }, "speed":{ "channel":5 } } } }
参数说明
参数
是否必填
默认值
描述
jdbcUrl
是
无
JDBC连接URL
jdbc:mysql://<ip>:<port>
。您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取IP地址和MySQL协议端口。
IP地址:VPC地址或公网地址。
端口:MySQL协议端口。
示例:
jdbc:mysql://selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030
说明如果DataX主机和SelectDB在同一VPC下,即可使用VPC地址。如果不在同一VPC下请使用公网地址。
loadUrl
是
无
SelectDB的HTTP协议访问地址
<ip>:<port>
。您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取IP地址和HTTP协议端口。
IP地址:VPC地址或公网地址。
端口:HTTP协议端口。
示例:
selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080
说明如果DataX主机和SelectDB在同一VPC下,即可使用VPC地址。如果不在同一VPC下请使用公网地址。
username
是
无
云数据库 SelectDB 版实例的用户名。
password
是
无
云数据库 SelectDB 版实例对应用户的密码。
connection.selectedDatabase
是
无
需要写入的云数据库 SelectDB 版数据库名称。
connection.table
是
无
需要写入的云数据库 SelectDB 版表名称。
column
是
无
目的表需要写入数据的字段,这些字段将作为生成的JSON数据的字段名。字段之间用英文逗号分隔。示例:
"column": ["id","name","age"]
。preSql
否
无
写入数据到目的表前,会先执行这里的标准语句。
postSql
否
无
写入数据到目的表后,会执行这里的标准语句。
maxBatchRows
否
500000
每批次导入数据的最大行数。和batchSize共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。
batchSize
否
104857600
每批次导入数据的最大数据量。和maxBatchRows共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。默认值为100 M。
maxRetries
否
3
每批次导入数据失败后的重试次数。
labelPrefix
否
datax_doris_writer_
每批次上传文件的label前缀。最终的label将由 'labelPrefix + UUID'组成全局唯一的label,确保数据不会重复导入。
loadProps
否
无
与Stream Load的请求参数相同。详情请参见Stream Load参数说明。配置导入数据格式使用参数format,导入数据格式默认使用CSV,支持JSON,详情请参考类型转换。
flushInterval
否
30000
数据写入批次的时间间隔。默认为30000ms
命令行提交任务。
cd target/datax/datax/bin python datax.py ../mysqlToSelectDB.json
类型转换
默认传入的数据均会被转为字符串,并以\t
作为列分隔符,\n
作为行分隔符,组成csv
文件进行SelectDB导入操作。 默认是csv格式导入,如需更改列分隔符, 则正确配置loadProps
即可,示例如下。
"loadProps": {
"format": "csv",
"column_separator": "\\x01",
"line_delimiter": "\\x02"
}
如需更改导入格式为JSON
,则正确配置loadProps
下的format
即可,示例如下。
"loadProps": {
"format": "json",
"strip_outer_array": true
}