本文以在DLA中读取OSS中的数据(dla_table_1、dla_table_2表),并通过INSERT…SELECT将符合条件的数据写入AnalyticDB(shipping、order_table)为例,介绍如何通过DLA向AnalyticDB中写入数据。
操作步骤
步骤一:创建OSS Schema
CREATE SCHEMA oss_data_schema with DBPROPERTIES(
catalog='oss',
location = 'oss://oss_bucket_name/table/'
);
步骤二:创建OSS表
dla_table_1
CREATE EXTERNAL TABLE IF NOT EXISTS dla_table_1 (
id bigint NOT NULL COMMENT '',
origin_state varchar NOT NULL COMMENT '',
origin_zip varchar NOT NULL COMMENT '',
destination_state varchar NOT NULL COMMENT '',
destination_zip varchar NOT NULL COMMENT '',
package_weight int NOT NULL COMMENT ''
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION 'oss://oss_bucket_name/table/';
dla_table_2
CREATE EXTERNAL TABLE IF NOT EXISTS dla_table_2 (
customer_id bigint NOT NULL COMMENT '',
order_id varchar NOT NULL COMMENT '',
order_time date NOT NULL COMMENT '',
order_amount double NOT NULL COMMENT '',
order_type varchar NOT NULL COMMENT '',
address varchar NOT NULL COMMENT '',
city varchar NOT NULL COMMENT '',
order_season bigint COMMENT '',
PRIMARY KEY (customer_id)
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION 'oss://oss_bucket_name/table/';
步骤三:创建AnalyticDB Schema
CREATE SCHEMA ads_database_schema WITH DBPROPERTIES
(
CATALOG = 'ads',
LOCATION = 'jdbc:mysql://ads-database-********-vpc.cn-shanghai-1.ads.aliyuncs.com:10001/ads_database',
USER='AnalyticDB AccessKey ID',
PASSWORD='AnalyticDB AccessKey Secret'
);
步骤四:创建AnalyticDB表
shipping
CREATE EXTERNAL TABLE shipping (
id bigint NOT NULL COMMENT '',
origin_state varchar NOT NULL COMMENT '',
origin_zip varchar NOT NULL COMMENT '',
destination_state varchar NOT NULL COMMENT '',
destination_zip varchar NOT NULL COMMENT '',
package_weight int NOT NULL COMMENT '',
PRIMARY KEY (id)
)
order_table1
CREATE EXTERNAL TABLE order_table1 (
customer1_id bigint NOT NULL COMMENT '',
order1_id bigint NOT NULL COMMENT '',
order1_time date NOT NULL COMMENT '',
order1_amount double NOT NULL COMMENT '',
order1_type varchar NOT NULL COMMENT '',
address1 varchar NOT NULL COMMENT '',
city1 varchar NOT NULL COMMENT '',
order1_season bigint COMMENT '',
PRIMARY KEY (customer1_id)
)
tblproperties (
table_mapping = 'ads_database.order_table',
column_mapping = 'customer1_id,customer_id; order1_id,order_id; order1_time:order_time,
order1_amount:order_amount, order1_type:order_type, address1:address,
city1:city,order1_season:order_season'
);
步骤五:执行INSERT…SELECT将OSS中的数据插入AnalyticDB
异步执行以下SQL,将OSS中table目录下dla_table_1文件中的数据插入AnalyticDB中ads_database数据库中的shipping表。
-- 执行OSS到AnalyticDB的全量数据插入
/*+run-async=true*/
INSERT INTO ads_database_schema.shipping
SELECT * FROM oss_data_schema.dla_table_1;
异步执行以下SQL,将OSS中table目录下dla_table_1文件中order_amount > 2
的数据插入AnalyticDB中ads_database数据库中的order_table表。
-- 执行OSS到AnalyticDB的数据插入,包含对OSS数据的筛选逻辑
/*+run-async=true*/
INSERT INTO ads_database_schema.order_table1 (customer1_id, order1_id, order1_time, order1_amount,order1_type,address1,city1,order1_season)
SELECT customer_id, order_id, order_time, order_amount,order_type,address,city,order_season
FROM oss_data_schema.dla_table_2
WHERE order_amount > 2
LIMIT 10000;
文档内容是否对您有帮助?