Iceberg数据源

本文介绍云数据库 SelectDB 版如何通过Catalog集成Iceberg数据源,对Iceberg数据源进行联邦分析。

前提条件

  • 已将Iceberg集群所有节点IP添加至SelectDB的白名单。具体操作,请参见设置白名单

  • 了解什么是Catalog,以及Catalog的基本操作。具体信息,请参见湖仓一体

注意事项

  • 支持Iceberg V1、V2表格式。

    说明

    V2表格式仅支持Position Delete方式,不支持Equality Delete

  • 目前,云数据库 SelectDB 版对External Catalog中的数据只支持读操作。

操作步骤

本示例中,iceberg_catalog为目标External Catalog,根据您的实际情况,替换成自己的参数。

步骤一:连接实例

连接SelectDB实例。具体操作,请参见通过MySQL客户端连接云数据库SelectDB版实例

步骤二:集成Iceberg

SelectDB通过创建External Catalog集成外部数据源,不同的Catalog类型,其访问Iceberg元数据的API不同,根据您的具体需求,选择合适的Catalog类型。

基于Hive API访问

如果您的Iceberg构建在Hive之上,并期望SelectDB调用Hive的API获取Iceberg元数据,您可以创建hms类型的Catalog进行集成Iceberg。具体操作与SelectDB集成Hive相同,此处仅提供简单示例,更多详情,请参见Hive数据源

CREATE CATALOG iceberg_catalog PROPERTIES (
    'type'='hms',
    'hive.metastore.uris' = 'thrift://172.21.0.1:7004',
    'hadoop.username' = 'hive',
    'dfs.nameservices'='your-nameservice',
    'dfs.ha.namenodes.your-nameservice'='nn1,nn2',
    'dfs.namenode.rpc-address.your-nameservice.nn1'='172.21.0.2:4007',
    'dfs.namenode.rpc-address.your-nameservice.nn2'='172.21.0.3:4007',
    'dfs.client.failover.proxy.provider.your-nameservice'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
);

基于Iceberg API访问

如果您期望SelectDB调用Iceberg的API获取Iceberg元数据,您可以根据Iceberg的服务类型,选择合适的Catalog类型。

支持调用Iceberg的API获取Iceberg元数据的服务有Hadoop File System、Hive、REST、DLF等。

重要

创建Iceberg Catalog中dfs相关的参数与Iceberg配置文件hdfs-site.xml中的参数是一一映射关系,参数和参数含义都相同。所以此处dfs相关参数值均需与配置文件hdfs-site.xml中的值保持一致。

Hadoop Catalog

-- 非HA集群
CREATE CATALOG iceberg_hadoop PROPERTIES (
    'type'='iceberg',
    'iceberg.catalog.type' = 'hadoop',
    'warehouse' = 'hdfs://your-host:8020/dir/key'
);
-- HA集群
CREATE CATALOG iceberg_hadoop_ha PROPERTIES (
    'type'='iceberg',
    'iceberg.catalog.type' = 'hadoop',
    'warehouse' = 'hdfs://your-nameservice/dir/key',
    'dfs.nameservices'='your-nameservice',
    'dfs.ha.namenodes.your-nameservice'='nn1,nn2',
    'dfs.namenode.rpc-address.your-nameservice.nn1'='172.21.0.2:4007',
    'dfs.namenode.rpc-address.your-nameservice.nn2'='172.21.0.3:4007',
    'dfs.client.failover.proxy.provider.your-nameservice'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
);

如果您的数据存放在对象存储(OSS)中,需要在PROPERTIES中配置以下参数。

"oss.access_key" = "ak"
"oss.secret_key" = "sk"
"oss.endpoint" = "oss-cn-beijing-internal.aliyuncs.com"
"oss.region" = "oss-cn-beijing"
参数说明
说明

如果您的数据存放在对象存储(OSS)中,OSS相关参数为必选参数。

参数

是否必选

说明

type

Catalog类型,固定为iceberg。

warehouse

仓库的HDFS路径。

iceberg.catalog.type

Iceberg的catalog类型。固定为hadoop。

dfs.nameservices

命名服务的名称。

dfs.ha.namenodes.[nameservice ID]

NameNode的ID列表。

dfs.namenode.rpc-address.[nameservice ID].[name node ID]

NameNode的RPC地址,数量与NameNode数量相同。

dfs.client.failover.proxy.provider.[nameservice ID]

HDFS客户端连接活跃NameNode的Java类,通常是org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider

oss.region

访问OSS数据的地域。

oss.endpoint

访问OSS数据的Endpoint。如何获取,请参见访问域名和数据中心

oss.access_key

访问OSS数据的Accesskey ID

oss.secret_key

访问OSS数据的AccessKey Secret

Hive Metastore

CREATE CATALOG iceberg_catalog PROPERTIES (
    'type'='iceberg',
    'iceberg.catalog.type'='hms',
    'hive.metastore.uris' = 'thrift://172.21.0.1:7004',
    'hadoop.username' = 'hive',
    'dfs.nameservices'='your-nameservice',
    'dfs.ha.namenodes.your-nameservice'='nn1,nn2',
    'dfs.namenode.rpc-address.your-nameservice.nn1'='172.21.0.2:4007',
    'dfs.namenode.rpc-address.your-nameservice.nn2'='172.21.0.3:4007',
    'dfs.client.failover.proxy.provider.your-nameservice'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
);

如果您的数据存放在对象存储(OSS)中,需要在PROPERTIES中配置以下参数。

"oss.access_key" = "ak"
"oss.secret_key" = "sk"
"oss.endpoint" = "oss-cn-beijing-internal.aliyuncs.com"
"oss.region" = "oss-cn-beijing"
参数说明
说明

如果您的数据存放在对象存储(OSS)中,OSS相关参数为必选参数。

参数

是否必选

说明

type

Catalog类型,固定为iceberg。

iceberg.catalog.type

Iceberg的Catalog类型。固定为hms。

hive.metastore.uris

Hive的metastore服务器URI。

hadoop.username

HDFS用户名。

dfs.nameservices

Name Service名称。

dfs.ha.namenodes.[nameservice ID]

NameNode的ID列表。

dfs.namenode.rpc-address.[nameservice ID].[name node ID]

NameNode的RPC地址,数量与NameNode数量相同。

dfs.client.failover.proxy.provider.[nameservice ID]

HDFS客户端连接活跃NameNode的Java类,通常是org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider

oss.region

访问OSS数据的地域。

oss.endpoint

访问OSS数据的Endpoint。如何获取,请参见访问域名和数据中心

oss.access_key

访问OSS数据的Accesskey ID

oss.secret_key

访问OSS数据的AccessKey Secret

REST Catalog

该方式需要预先提供REST服务,您需实现获取Iceberg元数据的REST接口。

CREATE CATALOG iceberg PROPERTIES (
    'type'='iceberg',
    'iceberg.catalog.type'='rest',
    'uri' = 'http://172.21.0.1:8181'
);

如果您使用HDFS存储数据,并开启了高可用模式,还需在Catalog中增加HDFS高可用配置。

CREATE CATALOG iceberg PROPERTIES (
    'type'='iceberg',
    'iceberg.catalog.type'='rest',
    'uri' = 'http://172.21.0.1:8181',
    'dfs.nameservices'='your-nameservice',
    'dfs.ha.namenodes.your-nameservice'='nn1,nn2',
    'dfs.namenode.rpc-address.your-nameservice.nn1'='172.21.0.1:8020',
    'dfs.namenode.rpc-address.your-nameservice.nn2'='172.21.0.2:8020',
    'dfs.client.failover.proxy.provider.your-nameservice'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
);
参数说明

参数

是否必选

说明

type

Catalog类型,固定为iceberg。

uri

REST服务的URI。

iceberg.catalog.type

Iceberg的catalog类型。固定为rest。

dfs.nameservices

Name Service名称。

dfs.ha.namenodes.[nameservice ID]

NameNode的ID列表。

dfs.namenode.rpc-address.[nameservice ID].[name node ID]

NameNode的RPC地址,数量与NameNode数量相同,与hdfs-site.xml保持一致。

dfs.client.failover.proxy.provider.[nameservice ID]

HDFS客户端连接活跃NameNode的Java类,通常是org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider

步骤三:查看Catalog

您可以通过以下语句,查看Catalog创建是否成功。

SHOW CATALOGS; --查看CATALOG是否创建成功
+--------------+--------------+----------+-----------+-------------------------+---------------------+------------------------+
| CatalogId    | CatalogName  | Type     | IsCurrent | CreateTime              | LastUpdateTime      | Comment                |
+--------------+--------------+----------+-----------+-------------------------+---------------------+------------------------+
| 436009309195 | iceberg_catalog | jdbc      |           | 2024-08-06 17:09:08.058 | 2024-07-19 18:04:37 |                        |
|            0 | internal     | internal | yes       | UNRECORDED              | NULL                | Doris internal catalog |
+--------------+--------------+----------+-----------+-------------------------+---------------------+------------------------+

步骤四:查看Iceberg库表数据

  • 在目标External Catalog目录下查看Iceberg库表数据。

    说明

    连接SelectDB实例后,默认操作目录为Internal Catalog。

    1. 切换目录至目标目录External Catalog。

      SWITCH iceberg_catalog;
    2. 查看数据。

      完成目标目录切换后,您可以像使用Internal Catalog一样,对External Catalog的数据进行查看和访问。例如以下操作:

      • 查看数据库列表:SHOW DATABASES;

      • 切换数据库:USE test_db;

      • 查看数据库表列表:SHOW TABLES;

      • 查看表数据。

        在Iceberg中,每次对表的写操作都会产生一个新的快照(Snapshot)。

        默认情况下,SelectDB的读请求只会读取Iceberg最新版本的快照,您可以使用FOR time AS OF FOR version AS OF语句,根据快照ID或者快照产生的时间读取历史版本的数据。示例如下:

        -- 查询表test_t最新快照版本的数据。
        SELECT * FROM test_t;
        -- 查询表test_t指定时间的数据。
        SELECT * FROM test_t FOR TIME AS OF "2022-10-07 17:20:37";
        -- 查询指定快照ID对应表test_t的数据。
        SELECT * FROM test_t FOR VERSION AS OF 868895038****72;

        如果您不知道iceberg表的各类元数据信息,如操作历史、生成的快照、文件元数据等,可通过表函数iceberg_meta获取,更多表函数iceberg_meta详情,请参见ICEBERG_META

  • 在Internal Catalog目录下查看Iceberg库表数据。

    --查看iceberg_catalog目录下,数据库test_db中表test_t的最新快照版本的数据。
    SELECT * FROM iceberg_catalog.test_db.test_t;
    -- 查询指定时间test_t的数据。
    SELECT * FROM iceberg_catalog.test_db.test_t FOR TIME AS OF "2022-10-07 17:20:37";
    -- 查询指定快照ID对应的test_t表的数据。
    SELECT * FROM iceberg_catalog.test_db.test_t FOR VERSION AS OF 868895038****72;

更多操作:迁移数据

完成数据源集成后,如果你需要将Iceberg的历史数据迁移至SelectDB中,您可通过内联语法进行历史数据迁移,具体操作,请参见Insert Into

类型映射

Iceberg数据源和SelectDB的列映射关系,与Hive数据源和SelectDB的映射关系一致。映射详情,请参见Hive数据源