通过Flink导入数据

云数据库 SelectDB 版兼容Apache Doris,支持通过Flink Doris Connector,将Kafka中的非结构化数据以及MySQL等上游业务数据库中的变更数据,实时同步到云数据库 SelectDB 版中,有效地满足海量数据的分析需求。

功能简介

Flink Doris Connector是云数据库 SelectDB 版流式导入数据的常用方式。基于Flink的流处理能力,您可以将上游数据源(例如:MySQL、Oracle、PostgreSQL、SQL Server、Kafka等)中的大量数据,通过Flink Doris Connector导入到SelectDB表中。同时,您也可以使用Flink的JDBC方式来读取SelectDB表中的数据。

重要

Flink Doris Connector目前仅支持向SelectDB写数据,如果您有读取SelectDB数据的需求,请使用Flink JDBC Connector。

flink1_bd9b152917_副本

工作原理

Flink Doris Connector在接收到数据后,通过HTTP的分块传输编码(Chunked Transfer Encoding)机制,持续向SelectDB写入数据。进一步结合Flink的Checkpoint机制和Stream Load的两阶段提交,Flink Doris Connector 实现了精确一次语义(EOS,Exactly-Once Semantics),确保了端到端的数据一致性。具体原理如下:

  1. Flink任务在启动的时候,会发起一个Stream Load的PreCommit请求,此时会先开启一个事务,同时会通过HTTP的分块传输编码(Chunked)机制将数据持续发送到SelectDB,其原理如下图:

    StreamLoad-1

  2. 在Checkpoint时,结束数据写入,同时完成HTTP请求,并且将事务状态设置为预提交(PreCommitted),此时数据已经写入SelectDB,对用户不可见,其原理如下图:

    StreamLoad-2

  3. Checkpoint完成后,发起Commit Stream Load请求,并且将事务状态设置为提交(Committed),完成后数据对用户可见,其原理如下图:

    StreamLoad-3

  4. Flink任务意外故障后,从Checkpoint重启时,若上次事务为预提交(PreCommitted)状态,则会发起回滚请求,并且将事务状态设置为Aborted。基于此,可以借助Flink Doris Connector实现数据实时入库时数据不丢不重。

前提条件

Flink版本必须大于或等于1.15,并且需与Connector版本相对应。版本对应规则如下:

Flink版本

Connector类型

Connector版本

下载地址

大于或等于1.15

Flink Doris Connector

1.5.2及以上版本

Flink Doris Connector

如何引入Flink Doris Connector

您可以通过以下方式,引入Flink Doris Connector

  • 如果您需要以Maven的方式引入Flink Doris Connector,需在项目的依赖配置文件中添加以下代码。更多版本,请参见Maven仓库。下载对应版本的JAR包,并放置在FLINK_HOME/lib目录下。JAR包下载地址,请参见JAR包

    <!-- flink-doris-connector -->
    <dependency>
      <groupId>org.apache.doris</groupId>
      <artifactId>flink-doris-connector-1.16</artifactId>
      <version>1.5.2</version>
    </dependency>  
  • 如果您的场景是通过阿里云实时计算Flink版导入数据到SelectDB,您可以使用自定义连接器管理功能,上传、使用和更新Flink Doris Connector。如何使用自定义连接器,请参见管理自定义连接器

使用示例

如下以Flink SQL、Flink CDC和DataStream三种方式,演示如何将上游的数据同步到云数据库 SelectDB 版

环境准备

搭建Flink环境,本文以Flink 1.16单机环境为例。

  1. 下载flink-1.16.3-bin-scala_2.12.tgz,进行解压,示例如下。

    wget https://dlcdn.apache.org/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz
    tar -zxvf flink-1.16.3-bin-scala_2.12.tgz
  2. 进入FLINK_HOME/lib目录中下载flink-sql-connector-mysql-cdc-2.4.2flink-doris-connector-1.16-1.5.2,示例如下。

    cd flink-1.16.3
    cd lib/
    wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.2/flink-sql-connector-mysql-cdc-2.4.2.jar
    wget https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.16/1.5.2/flink-doris-connector-1.16-1.5.2.jar
  3. 启动Flink Standalone集群,示例如下。

    bin/start-cluster.sh
  4. 创建云数据库 SelectDB 版实例,详情请参见创建实例

  5. 通过MySQL协议连接云数据库 SelectDB 版实例,详情请参见连接实例

  6. 创建测试数据库和测试表。

    1. 创建测试数据库。

      CREATE DATABASE test_db;
    2. 创建测试表。

      USE test_db;
      CREATE TABLE employees (
          emp_no       int NOT NULL,
          birth_date   date,
          first_name   varchar(20),
          last_name    varchar(20),
          gender       char(2),
          hire_date    date
      )
      UNIQUE KEY(`emp_no`)
      DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;

通过Flink SQL方式

如下以MySQL为例,介绍如何使用Flink SQL将上游的MySQL数据导入至云数据库 SelectDB 版

  1. 启动Flink SQL Client服务,示例如下。

    bin/sql-client.sh
  2. 在Flink SQL Client上提交Flink任务,示例如下。

    SET 'execution.checkpointing.interval' = '10s';
    
    CREATE TABLE employees_source (
        emp_no INT,
        birth_date DATE,
        first_name STRING,
        last_name STRING,
        gender STRING,
        hire_date DATE,
        PRIMARY KEY (`emp_no`) NOT ENFORCED
    ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = '127.0.0.1', 
        'port' = '3306',
        'username' = 'root',
        'password' = '****',
        'database-name' = 'test',
        'table-name' = 'employees'
    );
    
    CREATE TABLE employees_sink (
        emp_no       INT ,
        birth_date   DATE,
        first_name   STRING,
        last_name    STRING,
        gender       STRING,
        hire_date    DATE
    ) 
    WITH (
      'connector' = 'selectdb-preview',
      'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
      'table.identifier' = 'test.employees',
      'username' = 'admin',
      'password' = '****',
      'sink.enable-delete' = 'true'
    );
    
    INSERT INTO employees_sink SELECT * FROM employees_source;

通过Flink CDC方式

重要

阿里云实时计算Flink版不支持JAR作业方式,后面通过CDC 3.0的YAML作业来支持。

以下介绍如何使用Flink CDC将上游数据库的数据导入至云数据库 SelectDB 版

Flink CDC的语法如下:

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    <mysql-sync-database|oracle-sync-database|postgres-sync-database|sqlserver-sync-database> \
    --database <selectdb-database-name> \
    [--job-name <flink-job-name>] \
    [--table-prefix <selectdb-table-prefix>] \
    [--table-suffix <selectdb-table-suffix>] \
    [--including-tables <mysql-table-name|name-regular-expr>] \
    [--excluding-tables <mysql-table-name|name-regular-expr>] \
    --mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...] \
    --oracle-conf <oracle-cdc-source-conf> [--oracle-conf <oracle-cdc-source-conf> ...] \
    --sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \
    [--table-conf <selectdb-table-conf> [--table-conf <selectdb-table-conf> ...]]

参数

说明

execution.checkpointing.interval

Flink checkpoint的时间间隔,影响数据同步的频率,推荐10s。

parallelism.default

设置Flink任务的并行度,适当增加并行度可提高数据同步速度。

job-name

Flink作业名称。

database

同步到SelectDB的数据库名。

table-prefix

SelectDB表前缀名,例如 --table-prefix ods_

table-suffix

SelectDB表的后缀名。

including-tables

需要同步的表,可以使用"|"分隔多个表,并支持正则表达式。 例如--including-tables table1|tbl.*,指同步table1和所有以tbl.开头的表。

excluding-tables

不需要同步的表,配置方法与including-tables相同。

mysql-conf

MySQL CDC Source配置。详情请参见MySQL CDC Connector,其中hostnameusernamepassworddatabase-name是必选项。

oracle-conf

Oracle CDC Source配置。详情请参见Oracle CDC Connector,其中hostnameusernamepassworddatabase-nameschema-name是必选项。

sink-conf

Doris Sink的所有配置,详情请参见Sink配置项

table-conf

SelectDB表的配置项,即创建SelectDB表时properties中包含的内容。

说明
  1. 同步时需要在$FLINK_HOME/lib目录下添加对应的Flink CDC依赖,例如flink-sql-connector-mysql-cdc-${version}.jar,flink-sql-connector-oracle-cdc-${version}.jar。

  2. Flink 1.15以上的版本支持整库同步,Flink Doris Connector各个版本的下载请参见Flink Doris Connector

Sink配置项

参数

默认值

是否必填

说明

fenodes

云数据库 SelectDB 版实例的访问地址和HTTP协议端口。

您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和HTTP协议端口

示例:selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080

table.identifier

数据库与表名。示例:test_db.test_table

username

云数据库 SelectDB 版实例的数据库用户名。

password

请填写云数据库 SelectDB 版实例对应数据库用户名的密码。

jdbc-url

云数据库 SelectDB 版实例的JDBC连接信息。

您可以从云数据库 SelectDB 版控制台的实例详情 > 网络信息中获取VPC地址(或公网地址)和MySQL协议端口

示例:jdbc:mysql://selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030

auto-redirect

true

是否重定向Stream Load请求。开启后Stream Load将通过FE写入,不再显示获取BE信息。

doris.request.retries

3

向SelectDB发送请求的重试次数。

doris.request.connect.timeout

30s

向SelectDB发送请求的连接超时时间。

doris.request.read.timeout

30s

向SelectDB发送请求的读取超时时间。

sink.label-prefix

""

Stream load导入使用的label前缀。2pc场景下要求全局唯一 ,用来保证Flink的EOS语义。

sink.properties

Stream Load 的导入参数,请填写属性配置。

  • CSV格式时请写入:

    sink.properties.format='csv' 
    sink.properties.column_separator=','
    sink.properties.line_delimiter='\n' 
  • JSON格式时请写入:

    sink.properties.format='json' 

更多参数,请参见:Stream Load

sink.buffer-size

1048576

写数据缓存buffer大小,单位字节。不建议修改,默认配置即可,默认1 MB。

sink.buffer-count

3

写数据缓存buffer个数。不建议修改,默认配置即可。

sink.max-retries

3

提交(Commit)阶段失败后的最大重试次数,默认3次。

sink.use-cache

false

异常时,是否使用内存缓存进行恢复,开启后缓存中会保留Checkpoint期间的数据。

sink.enable-delete

true

是否同步删除事件。只支持Unique模型。

sink.enable-2pc

true

是否开启两阶段提交(2pc),默认为true,保证EOS语义。

sink.enable.batch-mode

false

是否使用攒批模式写入SelectDB,开启后写入时机不依赖Checkpoint,通过sink.buffer-flush.max-rows、sink.buffer-flush.max-bytes和sink.buffer-flush.interval参数来控制写入时机。

同时开启后将不保证EOS语义,可借助Unique模型做到幂等。

sink.flush.queue-size

2

攒批模式下,缓存的队列大小。

sink.buffer-flush.max-rows

50000

攒批模式下,单个批次最多写入的数据行数。

sink.buffer-flush.max-bytes

10MB

攒批模式下,单个批次最多写入的字节数。

sink.buffer-flush.interval

10s

攒批模式下,异步刷新缓存的间隔。最小1s。

sink.ignore.update-before

true

是否忽略update-before事件,默认忽略。

MySQL同步示例

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    mysql-sync-database \
    --database test_db \
    --mysql-conf hostname=127.0.0.1 \
    --mysql-conf username=root \
    --mysql-conf password=123456 \
    --mysql-conf database-name=mysql_db \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

Oracle同步示例

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    oracle-sync-database \
    --database test_db \
    --oracle-conf hostname=127.0.0.1 \
    --oracle-conf port=1521 \
    --oracle-conf username=admin \
    --oracle-conf password="password" \
    --oracle-conf database-name=XE \
    --oracle-conf schema-name=ADMIN \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

PostgreSQL同步示例

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    postgres-sync-database \
    --database db1\
    --postgres-conf hostname=127.0.0.1 \
    --postgres-conf port=5432 \
    --postgres-conf username=postgres \
    --postgres-conf password="123456" \
    --postgres-conf database-name=postgres \
    --postgres-conf schema-name=public \
    --postgres-conf slot.name=test \
    --postgres-conf decoding.plugin.name=pgoutput \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

SQL Server同步示例

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    sqlserver-sync-database \
    --database db1\
    --sqlserver-conf hostname=127.0.0.1 \
    --sqlserver-conf port=1433 \
    --sqlserver-conf username=sa \
    --sqlserver-conf password="123456" \
    --sqlserver-conf database-name=CDC_DB \
    --sqlserver-conf schema-name=dbo \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

通过DataStream方式

如下以MySQL为例,介绍如何使用DataStream将上游的MySQL数据导入至云数据库 SelectDB 版

  • maven依赖包如下,示例如下。

    <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <scala.version>2.12</scala.version>
            <java.version>1.8</java.version>
            <flink.version>1.16.3</flink.version>
            <fastjson.version>1.2.62</fastjson.version>
            <scope.mode>compile</scope.mode>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.junit.jupiter</groupId>
                <artifactId>junit-jupiter</artifactId>
                <version>RELEASE</version>
                <scope>test</scope>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>28.1-jre</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.14.0</version>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>2.0.31</version>
            </dependency>
    
    
            <dependency>
                <groupId>org.apache.doris</groupId>
                <artifactId>flink-doris-connector-1.16</artifactId>
                <version>1.5.2</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-jdbc</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.doris</groupId>
                <artifactId>flink-doris-connector-1.16</artifactId>
                <version>1.5.2</version>
            </dependency>
    
            <dependency>
                <groupId>com.ververica</groupId>
                <artifactId>flink-sql-connector-mysql-cdc</artifactId>
                <version>2.4.2</version>
                <exclusions>
                    <exclusion>
                        <artifactId>flink-shaded-guava</artifactId>
                        <groupId>org.apache.flink</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-runtime-web</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.25</version>
            </dependency>
    
        </dependencies>
  • Java核心代码,示例如下。

    package org.example;
    
    import com.ververica.cdc.connectors.mysql.source.MySqlSource;
    import com.ververica.cdc.connectors.mysql.table.StartupOptions;
    import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
    import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
    
    import org.apache.doris.flink.cfg.DorisExecutionOptions;
    import org.apache.doris.flink.cfg.DorisOptions;
    import org.apache.doris.flink.sink.DorisSink;
    import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
    import org.apache.doris.flink.tools.cdc.mysql.DateToStringConverter;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    
    public class Main {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.enableCheckpointing(10000);
    
            Map<String, Object> customConverterConfigs = new HashMap<>();
            customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric");
            JsonDebeziumDeserializationSchema schema =
                    new JsonDebeziumDeserializationSchema(false, customConverterConfigs);
    
            MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                    .hostname("rm-xxx.mysql.rds.aliyuncs.com")
                    .port(3306)
                    .startupOptions(StartupOptions.initial())
                    .databaseList("db_test")
                    .tableList("db_test.employees")
                    .username("root")
                    .password("test_123")
                    .debeziumProperties(DateToStringConverter.DEFAULT_PROPS)
                    .deserializer(schema)
                    .serverTimeZone("Asia/Shanghai")
                    .build();
    
            DorisSink.Builder<String> sinkBuilder = DorisSink.builder();
            DorisOptions.Builder dorisBuilder = DorisOptions.builder();
            dorisBuilder.setFenodes("selectdb-cn-xxx-public.selectdbfe.rds.aliyuncs.com:8080")
                    .setTableIdentifier("db_test.employees")
                    .setUsername("admin")
                    .setPassword("test_123");
    
            DorisOptions dorisOptions = dorisBuilder.build();
    
            Properties properties = new Properties();
            properties.setProperty("format", "json");
            properties.setProperty("read_json_by_line", "true");
    
            DorisExecutionOptions.Builder  executionBuilder = DorisExecutionOptions.builder();
            executionBuilder.setStreamLoadProp(properties);
    
            sinkBuilder.setDorisExecutionOptions(executionBuilder.build())
                    .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build()) //serialize according to string
                    .setDorisOptions(dorisOptions);
    
            DataStreamSource<String> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
            dataStreamSource.sinkTo(sinkBuilder.build());
            env.execute("MySQL to SelectDB");
        }
    }

使用进阶

使用Flink SQL更新部分列数据

示例

-- enable checkpoint
SET 'execution.checkpointing.interval' = '10s';

CREATE TABLE cdc_mysql_source (
   id INT
  ,name STRING
  ,bank STRING
  ,age INT
  ,PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = '127.0.0.1',
 'port' = '3306',
 'username' = 'root',
 'password' = 'password',
 'database-name' = 'database',
 'table-name' = 'table'
);

CREATE TABLE selectdb_sink (
    id INT,
    name STRING,
    bank STRING,
    age INT
) 
WITH (
  'connector' = 'selectdb-preview',
  'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'database.table',
  'username' = 'admin',
  'password' = '****',
  'sink.properties.format' = 'json',
  'sink.properties.read_json_by_line' = 'true',
  'sink.properties.columns' = 'id,name,bank,age',
  'sink.properties.partial.columns' = 'true' -- 开启部分列更新
);


INSERT INTO selectdb_sink SELECT id,name,bank,age FROM cdc_mysql_source;

使用Flink SQL根据指定列删除数据

在上游数据源为CDC的场景中,Doris Sink会根据RowKind来区分事件的类型,对隐藏列__DORIS_DELETE_SIGN__进行赋值以达到删除的目的。在上游数据源为Kafka消息的场景中,Doris Sink无法直接使用RowKind来区分操作类型,需要依赖消息中的特定字段来标记操作类型,比如{"op_type":"delete",data:{...}},针对这类数据,希望将op_type=delete的数据删除掉。此时需要根据业务逻辑判断,显式地传入隐藏列的值。下面以Flink SQL方式为例,介绍如何根据Kafka数据中的特定字段删除SelectDB中的数据。

示例

-- 比如上游数据: {"op_type":"delete",data:{"id":1,"name":"zhangsan"}}
CREATE TABLE KAFKA_SOURCE(
  data STRING,
  op_type STRING
) WITH (
  'connector' = 'kafka',
  ...
);

CREATE TABLE SELECTDB_SINK(
  id INT,
  name STRING,
  __DORIS_DELETE_SIGN__ INT
) WITH (
  'connector' = 'selectdb-preview',
  'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'db.table',
  'username' = 'admin',
  'password' = '****',
  'sink.enable-delete' = 'false',        -- false表示不从RowKind获取事件类型
  'sink.properties.columns' = 'id, name, __DORIS_DELETE_SIGN__'  -- 显示指定stream load的导入列
);

INSERT INTO SELECTDB_SINK
SELECT json_value(data,'$.id') as id,
json_value(data,'$.name') as name, 
if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__ 
FROM KAFKA_SOURCE;

常见问题

  • Q:如何写入Bitmap类型?

    A:示例如下所示:

    CREATE TABLE bitmap_sink (
      dt INT,
      page STRING,
      user_id INT 
    )
    WITH ( 
      'connector' = 'selectdb-preview', 
      'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
      'table.identifier' = 'test.bitmap_test', 
      'username' = 'admin', 
      'password' = '****', 
      'sink.label-prefix' = 'selectdb_label', 
      'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
    );
  • Q:如何解决报错:errCode = 2, detailMessage = Label[label_0_1]has already been used, relate to txn[19650]。

    A:Exactly-Once场景下,Flink Job重启时必须从最新的Checkpoint或Savepoint启动,否则会报如上错误。不要求Exactly-Once时,也可通过关闭两阶段提交(2PC)sink.enable-2pc=false 或更换不同的sink.label-prefix解决。

  • Q:如何解决报错:errCode = 2, detailMessage = transaction[19650]not found。

    A:此报错发生在提交(Commit)阶段,Checkpoint里面记录的事务ID,在SelectDB侧已经过期,此时再次提交(Commit)就会出现上述错误。 此时无法从Checkpoint启动,后续可通过修改SelectDB的参数streaming_label_keep_max_second配置来延长过期时间,默认为12小时。

  • Q:如何解决报错:errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100。

    A:此报错是因为同一个库并发导入超过了100,可以通过调整SelectDB的参数max_running_txn_num_per_db来解决,详情请参见max_running_txn_num_per_db

    同时,一个任务频繁修改label重启,也可能会导致这个错误。两阶段提交(2PC)场景下(Duplicate/Aggregate模型),每个任务的label需要唯一,并且从Checkpoint重启时,Flink任务才会主动中止(abort)之前启动的但未完成(即已precommit但未Commit的)事务(txn)。如果频繁修改label重启,会导致大量precommit成功的事务(txn)无法被中止(abort),占用事务。在Unique模型下也可关闭两阶段提交(2PC),通过设计Sink来实现幂等写入。

  • Q:Flink写入Unique模型时,如何保证一批数据的有序性?

    A:可以添加sequence列配置来保证,更多详情,请参见sequence

  • Q:为什么Flink任务没报错,但是无法同步数据?

    A:Connector 1.1.0版本以前,是攒批写入的,写入均是由数据驱动,需要判断上游是否有数据写入。1.1.0之后,依赖Checkpoint,必须开启Checkpoint才能写入。

  • Q:如何解决报错:tablet writer write failed, tablet_id=190958, txn_id=3505530, err=-235。

    A:此报错通常发生在Connector 1.1.0版本之前,是由于写入频率过快,导致版本过多。可以通过设置sink.buffer-flush.max-bytessink.buffer-flush.interval参数来降低Stream Load的频率。

  • Q:Flink导入时有脏数据,如何跳过?

    A:Flink在数据导入时,如果有脏数据,比如字段格式、长度等问题,会导致Stream Load报错,此时Flink会不断地重试。如果需要跳过,可以通过禁用Stream Load的严格模式(strict_mode=false,max_filter_ratio=1)或者在Sink算子之前对数据做过滤。

  • Q:源表和SelectDB表应如何对应?

    A:使用Flink Doris Connector导入数据时,要注意两个方面,一是源表的列和类型要跟Flink SQL中的列和类型对应;二是Flink SQL中的列和类型要跟SelectDB表的列和类型对应。

  • Q:如何解决报错:TApplicationException: get_next failed: out of sequence response: expected 4 but got 3。

    A:此报错是由于Thrift框架存在并发bug导致的,建议您使用尽可能新的Connector以及与之兼容的Flink版本。

  • Q:如何解决报错:DorisRuntimeException: Fail to abort transaction 26153 with urlhttp://192.168.XX.XX。

    A:你可以在TaskManager中搜索日志abort transaction response,根据HTTP返回码确定是客户端(Client)的问题还是服务器(Server)的问题。