云数据库 SelectDB 版支持表函数功能TVF(Table-Value-Function),可以将S3、HDFS等常见远端存储中的文件数据,映射成云数据库 SelectDB 版中的表,从而对这些文件数据进行分析。
S3表函数
S3表函数可以让您像访问关系型数据库一样,读取并访问S3兼容的对象存储上的文件内容。目前支持CVS、CSV_with_names、CVS_with_names_and_types、Json、Parquet、ORC文件格式。
语法
s3(
"uri" = "..",
"s3.access_key" = "...",
"s3.secret_key" = "...",
"s3.region" = "...",
"format" = "csv",
"keyn" = "valuen",
...
);
参数说明
S3 TVF中的每一个参数都是一个"key"="value"
键值对。常用参数如下:
参数名称 | 必选 | 参数取值 | 参数说明 |
uri | 是 | 字符串。 | 访问S3的URI。 |
s3.access_key | 是 | 字符串。 | 访问S3的用户身份密钥。 |
s3.secret_key | 是 | 字符串。 | 访问S3的用户加密认证字符串。 |
s3.region | 是 | 字符串。 | 访问S3的对象存储所在地域。默认值 |
s3.session_token | 否 说明 若启用临时会话验证,则该参数必选。 | 字符串。 | 访问S3的用户临时会话token。 |
use_path_style | 否 |
| S3 SDK默认使用Virtual-hosted Style方式。但某些对象存储系统可能未开启或不支持Virtual-hosted Style方式的访问,此时您可以添加 默认为 说明 URI目前支持三种schema:
|
format | 是 |
| 访问S3上的文件具体格式。 |
column_separator | 否 | 字符串。 | 列分割符,默认为 |
line_delimiter | 否 | 字符串。 | 行分割符,默认为 |
compress_type | 否 |
| 文件的压缩格式,会根据uri的后缀自动推断类型。默认值为UNKNOWN。 |
read_json_by_line | 否 |
| 是否以行为单位读取JSON数据,默认为true。 |
num_as_string | 否 |
| 数字类型按照String处理,默认为false。 |
fuzzy_parse | 否 |
| 加速JSON数据的导入效率,默认为false。 |
jsonpaths | 否 | 字符串。 | 当导入数据格式为JSON时,可通过 格式: |
strip_outer_array | 否 |
| 当导入数据格式为JSON时, 格式: |
json_root | 否 | 字符串。 | 当导入数据格式为JSON时,可以通过 格式: |
path_partition_keys | 否 | 字符串。 | 指定文件路径中携带的分区列名。例如/path/to/city=beijing/date="2023-07-09",则填写path_partition_keys="city,date"。此时SelectDB将会自动从路径中读取相应列名和列值进行导入。 |
使用示例
读取并访问S3兼容的对象存储上的CSV格式文件,示例如下。
SELECT * FROM s3("uri" = "http://127.0.0.1:9312/test2/student1.csv", "s3.access_key"= "minioadmin", "s3.secret_key" = "minioadmin", "format" = "csv", "use_path_style" = "true") ORDER BY c1;
配合
DESC FUNCTION
使用,示例如下。MySQL [(none)]> Desc FUNCTION s3("uri" = "http://127.0.0.1:9312/test2/student1.csv", "s3.access_key"= "minioadmin", "s3.secret_key" = "minioadmin", "format" = "csv", "use_path_style" = "true");
OSS的场合,需采用Virtual-hosted Style方式访问,示例如下。
SELECT * FROM s3( "uri" = "http://example-bucket.oss-cn-beijing.aliyuncs.com/your-folder/file.parquet", "s3.access_key"= "ak", "s3.secret_key" = "sk", "format" = "parquet", "use_path_style" = "false");
设置了
"use_path_style"="true"
的场合,将采用path style方式访问S3,示例如下。SELECT * FROM s3( "uri" = "https://endpoint/bucket/file/student.csv", "s3.access_key"= "ak", "s3.secret_key" = "sk", "format" = "csv", "use_path_style"="true");
设置了
"use_path_style"="false"
的场合,将采用virtual-hosted style方式访问S3,示例如下。SELECT * FROM s3( "uri" = "https://bucket.endpoint/bucket/file/student.csv", "s3.access_key"= "ak", "s3.secret_key" = "sk", "format" = "csv", "use_path_style"="false");
HDFS表函数
HDFS表函数可以让您像访问关系表格式数据一样,读取并访问HDFS上的文件内容。目前支持CVS、CSV_with_names、CVS_with_names_and_types、Json、Parquet、ORC文件格式。
语法
hdfs(
"uri" = "..",
"fs.defaultFS" = "...",
"hadoop.username" = "...",
"format" = "csv",
"keyn" = "valuen"
...
);
参数说明
HDFS TVF中的每一个参数都是一个"key"="value"
键值对。常用参数如下:
参数名称 | 必选 | 参数取值 | 参数说明 |
uri | 是 | 字符串。 | 要访问的HDFS的URI。如果URI路径不存在或文件都是空文件,HDFS TVF将返回空集合。 |
fs.defaultFS | 是 | 字符串。 | 要访问的HDFS的主机和端口号。 |
hadoop.username | 是 | 字符串。 | 要访问的HDFS用户名,可以是任意字符串,但不能为空。 |
hadoop.security.authentication | 否 |
| 要访问的HDFS认证方式。可选Simple或者Kerberos。 |
hadoop.kerberos.principal | 否 | 字符串。 | 要访问的HDFS启用Kerberos验证的场合,指定Principal。 |
hadoop.kerberos.keytab | 否 | 字符串。 | 要访问的HDFS启用Kerberos验证的场合,指定Keytab。 |
dfs.client.read.shortcircuit | 否 |
| HDFS短路本地读取开关。布尔类型。 |
dfs.domain.socket.path | 否 | 字符串。 | 一个指向UNIX域套接字的路径,用于DataNode和本地HDFS客户端通信。如果在该路径中出现了字符串"_PORT",会被替换成DataNode的TCP端口。可选参数。 |
dfs.nameservices | 否 | 字符串。 | 提供服务的NS逻辑名称,与core-site.xml里相应字段对应。 |
dfs.ha.namenodes.your-nameservices | 否 说明 采用Hadoop HA部署的场合为必选参数。 | 字符串。 | dfs.nameservices下的NameNode逻辑名称。 |
dfs.namenode.rpc-address.your-nameservices.your-namenode | 否 说明 采用Hadoop HA部署的场合为必选参数。 | 字符串。 | 每个NameNode监听的HTTP地址。 |
dfs.client.failover.proxy.provider.your-nameservices | 否 说明 采用Hadoop HA部署的场合为必选参数。 | 字符串。 | 客户端连接可用状态的NameNode所用的代理类。 |
read_json_by_line | 否 |
| 以行为单位读取JSON数据,默认为true。 |
num_as_string | 否 |
| 数字类型按照String处理,默认为false。 |
fuzzy_parse | 否 |
| 加速JSON数据的导入效率,默认为false。 |
jsonpaths | 否 | 字符串。 | 当导入数据格式为JSON时,可通过 格式: |
strip_outer_array | 否 |
| 当导入数据格式为JSON时, 格式: |
json_root | 否 | 字符串。 | 当导入数据格式为JSON时,可以通过 格式: |
trim_double_quotes | 否 |
| 布尔类型,默认值为false。为true时表示裁剪掉CSV文件每个字段最外层的双引号。 |
skip_lines | 否 | [0-Integer.MaxValue] | 整数类型,默认值为0。含义为跳过CSV文件的前几行。当format设置为 |
path_partition_keys | 否 | 字符串。 | 指定文件路径中携带的分区列名。例如/path/to/city=beijing/date="2023-07-09",则填写path_partition_keys="city,date"。此时SelectDB将会自动从路径中读取相应列名和列值进行导入。 |
使用示例
读取并访问HDFS存储上的CSV格式文件,示例如下。
MySQL [(none)]> SELECT * FROM hdfs(
"uri" = "hdfs://127.0.0.1:842/user/doris/csv_format_test/student.csv",
"fs.defaultFS" = "hdfs://127.0.0.1:8424",
"hadoop.username" = "doris",
"format" = "csv");
-- 返回示例
+------+---------+------+
| c1 | c2 | c3 |
+------+---------+------+
| 1 | alice | 18 |
| 2 | bob | 20 |
| 3 | jack | 24 |
| 4 | jackson | 19 |
| 5 | liming | 18 |
+------+---------+------+
读取并访问HA模式的HDFS存储上的CSV格式文件,示例如下。
MySQL [(none)]> SELECT * FROM hdfs(
"uri" = "hdfs://127.0.0.1:842/user/doris/csv_format_test/student.csv",
"fs.defaultFS" = "hdfs://127.0.0.1:8424",
"hadoop.username" = "doris",
"format" = "csv",
"dfs.nameservices" = "my_hdfs",
"dfs.ha.namenodes.my_hdfs" = "nn1,nn2",
"dfs.namenode.rpc-address.my_hdfs.nn1" = "nanmenode01:8020",
"dfs.namenode.rpc-address.my_hdfs.nn2" = "nanmenode02:8020",
"dfs.client.failover.proxy.provider.my_hdfs" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
-- 返回示例
+------+---------+------+
| c1 | c2 | c3 |
+------+---------+------+
| 1 | alice | 18 |
| 2 | bob | 20 |
| 3 | jack | 24 |
| 4 | jackson | 19 |
| 5 | liming | 18 |
+------+---------+------+
配合DECS FUNCTION
使用,示例如下。
MySQL [(none)]> DECS FUNCTION hdfs(
"uri" = "hdfs://127.0.0.1:8424/user/doris/csv_format_test/student_with_names.csv",
"fs.defaultFS" = "hdfs://127.0.0.1:8424",
"hadoop.username" = "doris",
"format" = "csv_with_names");
使用方法
本章节将通过S3 TVF举例说明如何进行文件分析。
自动推断文件列类型
目前支持对Parquet、ORC、CSV、JSON格式进行分析和列类型推断。
> DESC FUNCTION s3 (
"URI" = "http://127.0.0.1:9312/test2/test.snappy.parquet",
"s3.access_key"= "ak",
"s3.secret_key" = "sk",
"format" = "parquet",
"use_path_style"="true"
);
-- 返回示例
+---------------+--------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+---------------+--------------+------+-------+---------+-------+
| p_partkey | INT | Yes | false | NULL | NONE |
| p_name | TEXT | Yes | false | NULL | NONE |
| p_mfgr | TEXT | Yes | false | NULL | NONE |
| p_brand | TEXT | Yes | false | NULL | NONE |
| p_type | TEXT | Yes | false | NULL | NONE |
| p_size | INT | Yes | false | NULL | NONE |
| p_container | TEXT | Yes | false | NULL | NONE |
| p_retailprice | DECIMAL(9,0) | Yes | false | NULL | NONE |
| p_comment | TEXT | Yes | false | NULL | NONE |
+---------------+--------------+------+-------+---------+-------+
可以看到,对于Parquet文件,SelectDB会根据文件内的元信息自动推断列类型。
CSV Schema
默认情况下,CSV格式文件的所有列类型均会被SelectDB视为String。您可以通过csv_schema
属性单独指定列名和列类型。SelectDB会使用指定的列类型进行文件读取。格式如下:
name1:type1;name2:type2;...
对于格式不匹配的列(例如文件中实际为字符串,您定义为int),或缺失列(比如文件中实际存在4列,您定义了5列),则这些列将返回null。当前支持的列类型为:
名称 | 映射类型 |
tinyint | tinyint |
smallint | smallint |
int | int |
bigint | bigint |
largeint | largeint |
float | float |
double | double |
decimal(p,s) | decimalv3(p,s) |
date | datev2 |
datetime | datetimev2 |
char | string |
varchar | string |
string | string |
boolean | boolean |
示例如下。
s3 (
"URI" = "https://bucket1/inventory.dat",
"s3.access_key"= "ak",
"s3.secret_key" = "sk",
"format" = "csv",
"column_separator" = "|",
"csv_schema" = "k1:int;k2:int;k3:int;k4:decimal(38,10)",
"use_path_style"="true"
)
查询分析
您可以使用任意的SQL语句,对TVF进行分析,示例如下。
SELECT * FROM s3(
"URI" = "http://127.0.0.1:9312/test2/test.snappy.parquet",
"s3.access_key"= "ak",
"s3.secret_key" = "sk",
"format" = "parquet",
"use_path_style"="true")
LIMIT 5;
-- 返回示例
+-----------+------------------------------------------+----------------+----------+-------------------------+--------+-------------+---------------+---------------------+
| p_partkey | p_name | p_mfgr | p_brand | p_type | p_size | p_container | p_retailprice | p_comment |
+-----------+------------------------------------------+----------------+----------+-------------------------+--------+-------------+---------------+---------------------+
| 1 | goldenrod lavender spring chocolate lace | Manufacturer#1 | Brand#13 | PROMO BURNISHED COPPER | 7 | JUMBO PKG | 901 | ly. slyly ironi |
| 2 | blush thistle blue yellow saddle | Manufacturer#1 | Brand#13 | LARGE BRUSHED BRASS | 1 | LG CASE | 902 | lar accounts amo |
| 3 | spring green yellow purple cornsilk | Manufacturer#4 | Brand#42 | STANDARD POLISHED BRASS | 21 | WRAP CASE | 903 | egular deposits hag |
| 4 | cornflower chocolate smoke green pink | Manufacturer#3 | Brand#34 | SMALL PLATED BRASS | 14 | MED DRUM | 904 | p furiously r |
| 5 | forest brown coral puff cream | Manufacturer#3 | Brand#32 | STANDARD POLISHED TIN | 15 | SM PKG | 905 | wake carefully |
+-----------+------------------------------------------+----------------+----------+-------------------------+--------+-------------+---------------+---------------------+
TVF可以出现在SQL中Table能出现的任意位置,如CTE的WITH子句中或者FROM子句中,您可以把文件当做一张普通的表进行任意分析。
您也可以通过CREATE VIEW
语句为TVF创建一个逻辑视图。这样您可以像其他视图一样,对这个TVF进行访问、权限管理等操作,也可以让其他用户访问这个TVF。
CREATE VIEW v1 AS
SELECT * FROM s3(
"URI" = "http://127.0.0.1:9312/test2/test.snappy.parquet",
"s3.access_key"= "ak",
"s3.secret_key" = "sk",
"format" = "parquet",
"use_path_style"="true");
DESC v1;
SELECT * FROM v1;
GRANT SELECT_PRIV ON db1.v1 TO user1;
数据导入
配合INSERT INTO SELECT
语法,您可以方便将文件导入到SelectDB表中进行更快速的分析,示例如下。
-- 1. 创建SelectDB内部表
CREATE TABLE IF NOT EXISTS test_table
(
id int,
name varchar(50),
age int
)
DISTRIBUTED BY HASH(id) BUCKETS 4
PROPERTIES("replication_num" = "1");
-- 2. 使用S3 Table Value Function插入数据
INSERT INTO test_table (id,name,age)
SELECT cast(id as INT) as id, name, cast (age as INT) as age
FROM s3(
"uri" = "http://127.0.0.1:9312/test2/test.snappy.parquet",
"s3.access_key"= "ak",
"s3.secret_key" = "sk",
"format" = "parquet",
"use_path_style" = "true");
注意事项
如果S3或HDFS TVF指定的URI匹配不到文件,或者匹配到的所有文件都是空文件,那么S3或HDFS TVF将会返回空结果集。在这种情况下使用
DESC FUNCTION
查看这个文件的Schema,会得到一列虚假的列__dummy_col
,可忽略这一列。如果指定TVF的format为CSV,所读文件不为空文件但文件第一行为空,则会产生报误
The first line is empty, can not parse column numbers
。