数据写入AnalyticDB

本文以在DLA中读取OSS中的数据(dla_table_1、dla_table_2表),并通过INSERT…SELECT将符合条件的数据写入AnalyticDB(shipping、order_table)为例,介绍如何通过DLA向AnalyticDB中写入数据。

操作步骤

步骤一:创建OSS Schema

 CREATE SCHEMA oss_data_schema with DBPROPERTIES(
   catalog='oss',
  location = 'oss://oss_bucket_name/table/'
  );

步骤二:创建OSS表

dla_table_1

CREATE EXTERNAL TABLE IF NOT EXISTS dla_table_1 (
  id bigint NOT NULL COMMENT '',
  origin_state varchar NOT NULL COMMENT '',
  origin_zip varchar NOT NULL COMMENT '',
  destination_state varchar NOT NULL COMMENT '',
  destination_zip varchar NOT NULL COMMENT '',
  package_weight int NOT NULL COMMENT ''
) 
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' 
STORED AS TEXTFILE 
LOCATION 'oss://oss_bucket_name/table/';

dla_table_2

CREATE EXTERNAL TABLE IF NOT EXISTS dla_table_2 (
   customer_id bigint NOT NULL COMMENT '',
   order_id varchar NOT NULL COMMENT '',
   order_time date NOT NULL COMMENT '',
   order_amount double NOT NULL COMMENT '',
   order_type varchar NOT NULL COMMENT '',
   address varchar NOT NULL COMMENT '',
   city varchar NOT NULL COMMENT '',
   order_season bigint COMMENT '',
   PRIMARY KEY (customer_id)
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' 
STORED AS TEXTFILE 
LOCATION 'oss://oss_bucket_name/table/';

步骤三:创建AnalyticDB Schema

CREATE SCHEMA ads_database_schema WITH DBPROPERTIES 
 ( 
   CATALOG = 'ads', 
   LOCATION = 'jdbc:mysql://ads-database-********-vpc.cn-shanghai-1.ads.aliyuncs.com:10001/ads_database',
   USER='AnalyticDB AccessKey ID',
   PASSWORD='AnalyticDB AccessKey Secret'
 );

步骤四:创建AnalyticDB表

shipping

CREATE EXTERNAL TABLE shipping (
  id bigint NOT NULL COMMENT '',
  origin_state varchar NOT NULL COMMENT '',
  origin_zip varchar NOT NULL COMMENT '',
  destination_state varchar NOT NULL COMMENT '',
  destination_zip varchar NOT NULL COMMENT '',
  package_weight int NOT NULL COMMENT '',
  PRIMARY KEY (id)
)

order_table1

CREATE EXTERNAL TABLE order_table1 (
  customer1_id bigint NOT NULL COMMENT '',
  order1_id bigint NOT NULL COMMENT '',
  order1_time date NOT NULL COMMENT '',
  order1_amount double NOT NULL COMMENT '',
  order1_type varchar NOT NULL COMMENT '',
  address1 varchar NOT NULL COMMENT '',
  city1 varchar NOT NULL COMMENT '',
  order1_season bigint COMMENT '',
  PRIMARY KEY (customer1_id)
)
tblproperties (
   table_mapping = 'ads_database.order_table', 
   column_mapping = 'customer1_id,customer_id; order1_id,order_id; order1_time:order_time, 
   order1_amount:order_amount, order1_type:order_type, address1:address, 
   city1:city,order1_season:order_season'
);

步骤五:执行INSERT…SELECT将OSS中的数据插入AnalyticDB

异步执行以下SQL,将OSS中table目录下dla_table_1文件中的数据插入AnalyticDB中ads_database数据库中的shipping表。

-- 执行OSS到AnalyticDB的全量数据插入
/*+run-async=true*/
INSERT INTO ads_database_schema.shipping
SELECT * FROM oss_data_schema.dla_table_1;

异步执行以下SQL,将OSS中table目录下dla_table_1文件中order_amount > 2的数据插入AnalyticDB中ads_database数据库中的order_table表。

-- 执行OSS到AnalyticDB的数据插入,包含对OSS数据的筛选逻辑
/*+run-async=true*/
INSERT INTO ads_database_schema.order_table1 (customer1_id, order1_id, order1_time, order1_amount,order1_type,address1,city1,order1_season)
SELECT customer_id, order_id, order_time, order_amount,order_type,address,city,order_season
FROM oss_data_schema.dla_table_2 
WHERE order_amount > 2 
LIMIT 10000;