Routine Load支持提交一个常驻的导入作业,不断地从指定的数据源读取数据,将数据持续地导入至云数据库 SelectDB 版中。本文介绍如何通过Routine Load将Kafka中的数据导入至云数据库 SelectDB 版实例。
前提条件
支持的数据源:目前仅支持Kafka数据源,可通过无认证方式或PLAIN/SSL/Kerberos等认证方式连接Kafka。
支持的消息格式:
CSV
或JSON
格式。CSV的格式,每一个Message为一行,且行尾不包含换行符。
注意事项
默认支持Kafka 0.10.0.0(含)以上版本。如果使用Kafka 0.10.0.0以下版本(0.9.0,0.8.2,0.8.1,0.8.0),需要Kafka兼容旧版本,具体操作有以下两种方式:
将BE的配置
kafka_broker_version_fallback
的值设置为要兼容的旧版本。创建Routine Load的时候直接设置
property.broker.version.fallback
的值为要兼容的旧版本。
使用兼容旧版本的代价在于,Routine Load的部分新特性可能无法使用,例如根据时间设置Kafka分区的offset。
创建导入作业
使用Routine Load功能时,首先需创建一个Routine Load作业。该作业将通过例行调度持续发送任务,每个任务会消耗一定数量的Kafka消息。
语法
CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[merge_type]
[load_properties]
[job_properties]
FROM data_source [data_source_properties]
参数说明
参数名称 | 参数说明 |
| 导入作业的名称。在同一个Database内,相同名称的job只能运行一个。 |
| 指定导入的表的名称。 |
| 指定数据合并类型。默认为 |
| 指定导入数据处理相关参数。详细参数说明,请参见load_properties。 |
| 指定导入作业相关参数。详细参数说明,请参见job_properties参数说明。 |
| 指定数据源的类型。详细参数说明,请参见data_source_properties参数说明。 |
load_properties参数说明
[column_separator],
[columns_mapping],
[preceding_filter],
[where_predicates],
[partitions],
[DELETE ON],
[ORDER BY]
参数名称 | 示例值 | 参数说明 |
| COLUMNS TERMINATED BY "," | 指定列分隔符,默认为 |
| (k1,k2,tmpk1,k3=tmpk1+1) | 指定文件列和表中列的映射关系,以及各种列转换等。详细说明请参见数据转化。 |
| 无 | 指定过滤原始数据条件。详细说明请参见数据转化。 |
| WHERE k1>100 and k2=1000 | 指定条件对导入的数据进行过滤。详细说明请参见数据转化。 |
| PARTITION(p1,p2,p3) | 指定导入目的表的哪些Partition中。如果不指定,则会自动导入到对应的Partition中。 |
| DELETE ON v3>100 | 用于指定导入数据中表示Delete Flag的列和计算关系。 说明 需配合MEREGE导入模式一起使用,仅适用于Unique Key模型的表。 |
| 无 | 用于指定导入数据中表示Sequence Col的列。其功能为导入数据时保证数据顺序。 说明 仅适用于对Unique Key模型的表。 |
job_properties参数说明
PROPERTIES (
"key1" = "val1",
"key2" = "val2"
)
这三个参数max_batch_interval、max_batch_rows和max_batch_size用于控制子任务的执行时间和处理量。当任意一个参数达到设定阈值时,任务将终止。
参数名称 | 示例值 | 参数说明 |
| "desired_concurrent_number" = "3" | 指定期望并发度。大于0,默认为 说明
|
| "max_batch_interval" = "20" | 指定每个子任务最大执行时间,单位是秒,默认为 |
| "max_batch_rows" = "300000" | 指定每个子任务最多读取的行数。默认是 |
| "max_batch_size" = "209715200" | 指定每个子任务最多读取的字节数。单位是字节,默认为 |
| "max_error_number"="3" | 指定采样窗口内,允许的最大错误行数。默认为 采样窗口为 说明 被 |
| "strict_mode"="true" | 指定是否开启严格模式,默认为
|
| "timezone" = "Africa/Abidjan" | 指定导入作业所使用的时区。默认为使用Session的时区作为参数。 说明 该参数会影响所有导入涉及的和时区有关的函数结果。 |
| "format" = "json" | 指定导入数据格式,默认为 |
| -H "jsonpaths:[\"$.k2\",\"$.k1\"]" | 当导入数据格式为 |
| -H "strip_outer_array:true" | 当导入数据格式为 |
| -H "json_root:$.RECORDS" | 当导入数据格式为JSON时,可以通过 |
| 无 | 指定设置发送批处理数据的并行度,如果并行度的值超过BE配置中的 |
| 无 | 指定是否只导入数据到对应分区的一个tablet,默认值为false。该参数只允许向对带有random分区的Duplicate表导入数据的时设置。 |
严格模式(strict mode)与原始数据(source data)的导入关系
例如列类型为TinyInt。当表中的列允许导入空值时导入关系如下。
source data | source data example | string to int | strict_mode | result |
空值 | \N | N/A | true or false | NULL |
not null | aaa or 2000 | NULL | true | invalid data(filtered) |
not null | aaa | NULL | false | NULL |
not null | 1 | 1 | true or false | correct data |
例如列类型为Decimal(1,0)。当表中的列允许导入空值时导入关系如下。
source data | source data example | string to int | strict_mode | result |
空值 | \N | N/A | true or false | NULL |
not null | aaa | NULL | true | invalid data(filtered) |
not null | aaa | NULL | false | NULL |
not null | 1 or 10 | 1 | true or false | correct data |
10虽然是一个超过范围的值,但是因为其类型符合Decimal的要求,所以strict mode对其不产生影响。10最后会在其他ETL处理流程中被过滤。但不会被strict mode过滤。
data_source_properties参数说明
FROM KAFKA
(
"key1" = "val1",
"key2" = "val2"
)
参数名称 | 参数说明 |
| 指定Kafka的Broker连接信息。格式为 格式: |
| 指定订阅的Kafka的Topic。 格式: |
| 指定订阅的Kafka Partition,以及对应的每个Partition的起始Offset。如果指定时间,则会从大于等于该时间的最近一个Offset处开始消费。 Offset可以指定从大于等于0的具体Offset,或者:
如果没有指定,则默认从 示例如下。
重要 时间格式不能和OFFSET格式混用。 |
| 指定自定义Kafka参数。功能等同于Kafka shell中"--property"参数。 当参数的value为一个文件时,需要在value前加上关键词:" |
Property参数说明
使用SSL连接Kafka时,需要指定以下参数:
"property.security.protocol" = "ssl", "property.ssl.ca.location" = "FILE:ca.pem", "property.ssl.certificate.location" = "FILE:client.pem", "property.ssl.key.location" = "FILE:client.key", "property.ssl.key.password" = "abcdefg"
其中
property.security.protocol
和property.ssl.ca.location
为必选项,用于指明连接方式为SSL,以及CA证书的位置。如果Kafka Server端开启了Client认证,则还需设置以下参数。
"property.ssl.certificate.location" "property.ssl.key.location" "property.ssl.key.password"
分别用于指定Client的public key、private key、private key的密码。
指定Kafka Partition的默认起始offset。
没有指定
kafka_partitions/kafka_offsets
,默认消费所有分区。此时可以通过kafka_default_offsets
指定起始offset。默认为OFFSET_END
,即从末尾开始订阅。"property.kafka_default_offsets" = "OFFSET_BEGINNING"
更多支持的自定义参数,请参阅librdkafka的官方CONFIGURATION文档中,Client端的配置项。例如以下参数。
"property.client.id" = "12345",
"property.ssl.ca.location" = "FILE:ca.pem"
使用示例
创建Routine Load简单作业
创建待导入的SelectDB数据表,示例如下。
CREATE TABLE test_table ( id int, name varchar(50), age int, address varchar(50), url varchar(500) ) UNIQUE KEY(`id`, `name`) DISTRIBUTED BY HASH(id) BUCKETS 4 PROPERTIES("replication_num" = "1");
分别设置不同的参数导入数据,示例如下。
为example_db的test_table创建一个名为test1的Kafka Routine Load任务。指定列分隔符的group.id和client.id,设置自动默认消费所有分区,且从有数据的位置(OFFSET_BEGINNING)开始订阅,示例如下。
CREATE ROUTINE LOAD example_db.test1 ON test_table COLUMNS TERMINATED BY ",", COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );
为example_db的test_table创建一个名为test2的Kafka Routine Load任务。导入任务为严格模式,示例如下。
CREATE ROUTINE LOAD example_db.test2 ON test_table COLUMNS TERMINATED BY ",", COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "true" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );
从指定的时间点开始消费,示例如下。
CREATE ROUTINE LOAD example_db.test4 ON test_table PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "30", "max_batch_rows" = "300000", "max_batch_size" = "209715200" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092", "kafka_topic" = "my_topic", "property.kafka_default_offset" = "2024-01-21 10:00:00" );
导入JSON格式数据
Routine Load导入的JSON格式仅支持以下两种。
只有一条记录,且为JSON对象。
当使用单表导入(即通过ON TABLE_NAME指定表名)时,JSON数据格式如下。
{"key1":"value1","key2":"value2","key3":"value3"}
当使用动态或多表导入Routine Load(即不指定具体的表名)时,JSON数据格式如下。
table_name|{"key1":"value1","key2":"value2","key3":"value3"}
JSON数组,数组中可含多条记录。
当使用单表导入(即通过ON TABLE_NAME指定表名)时,JSON数据格式如下。
[ { "key1":"value11", "key2":"value12", "key3":"value13", "key4":14 }, { "key1":"value21", "key2":"value22", "key3":"value23", "key4":24 }, { "key1":"value31", "key2":"value32", "key3":"value33", "key4":34 } ]
当使用动态或多表导入(即不指定具体的表名)时,JSON数据格式如下。
table_name|[ { "key1":"value11", "key2":"value12", "key3":"value13", "key4":14 }, { "key1":"value21", "key2":"value22", "key3":"value23", "key4":24 }, { "key1":"value31", "key2":"value32", "key3":"value33", "key4":34 } ]
导入JSON格式数据,示例如下。
创建待导入的SelectDB数据表,示例如下。
CREATE TABLE `example_tbl` ( `category` varchar(24) NULL COMMENT "", `author` varchar(24) NULL COMMENT "", `timestamp` bigint(20) NULL COMMENT "", `dt` int(11) NULL COMMENT "", `price` double REPLACE ) ENGINE=OLAP AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`) COMMENT "OLAP" PARTITION BY RANGE(`dt`) ( PARTITION p0 VALUES [("-2147483648"), ("20230509")), PARTITION p20200509 VALUES [("20230509"), ("20231010")), PARTITION p20200510 VALUES [("20231010"), ("20231211")), PARTITION p20200511 VALUES [("20231211"), ("20240512")) ) DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4;
分别使用两个类型的JSON数据记录到Topic里:
{ "category":"value1331", "author":"value1233", "timestamp":1700346050, "price":1413 }
[ { "category":"value13z2", "author":"vaelue13", "timestamp":1705645251, "price":14330 }, { "category":"lvalue211", "author":"lvalue122", "timestamp":1684448450, "price":24440 } ]
以不同模式导入JSON数据,示例如下。
以简单模式导入JSON数据。
CREATE ROUTINE LOAD example_db.test_json_label_1 ON example_tbl COLUMNS(category,price,author) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "format" = "json" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2", "kafka_offsets" = "0,0,0" );
精准导入JSON格式数据。
CREATE ROUTINE LOAD example_db.test_json_label_3 ON example_tbl COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d')) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "format" = "json", "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]", "strip_outer_array" = "true" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2", "kafka_offsets" = "0,0,0" );
说明表里的分区字段
dt
在示例数据里并没有,而是在Routine Load语句里通过dt=from_unixtime(timestamp,'%Y%m%d')
转换出来的。
访问不同验证方式的Kafka集群
根据Kafka集群验证方式的不同,访问的方式示例如下。
访问SSL认证的Kafka集群。
访问SSL认证的Kafka集群需要您提供用于认证Kafka Broker公钥的证书文件(ca.pem)。如果Kafka集群同时开启了客户端认证,则还需提供客户端的公钥(client.pem)、密钥文件(client.key),以及密钥密码。这里所需的文件需要先通过
CREATE FILE
命令上传到SelectDB中,并且Catalog名称为kafka。上传文件,示例如下。
CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka"); CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka"); CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");
创建Routine Load作业,示例如下。
CREATE ROUTINE LOAD db1.job1 on tbl1 PROPERTIES ( "desired_concurrent_number"="1" ) FROM KAFKA ( "kafka_broker_list"= "broker1:9091,broker2:9091", "kafka_topic" = "my_topic", "property.security.protocol" = "ssl", "property.ssl.ca.location" = "FILE:ca.pem", "property.ssl.certificate.location" = "FILE:client.pem", "property.ssl.key.location" = "FILE:client.key", "property.ssl.key.password" = "abcdefg" );
说明SelectDB通过Kafka的C++ API
librdkafka
来访问Kafka集群。librdkafka
所支持的参数可以参阅Configuration properties。
访问PLAIN认证的Kafka集群。
访问开启PLAIN认证的Kafka集群,需要增加配置如下。
property.security.protocol=SASL_PLAINTEXT:使用SASL plaintext。
property.sasl.mechanism=PLAIN:设置SASL的认证方式为PLAIN。
property.sasl.username=admin:设置SASL的用户名。
property.sasl.password=admin:设置SASL的密码。
创建Routine Load作业,示例如下。
CREATE ROUTINE LOAD db1.job1 on tbl1 PROPERTIES ( "desired_concurrent_number"="1", ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092", "kafka_topic" = "my_topic", "property.security.protocol"="SASL_PLAINTEXT", "property.sasl.mechanism"="PLAIN", "property.sasl.username"="admin", "property.sasl.password"="admin" );
访问Kerberos认证的Kafka集群。
访问开启kerberos认证的Kafka集群,需要增加配置如下。
security.protocol=SASL_PLAINTEXT:使用SASL plaintext。
sasl.kerberos.service.name=$SERVICENAME:设置broker servicename。
sasl.kerberos.keytab=/etc/security/keytabs/${CLIENT_NAME}.keytab:设置Keytab本地文件路径。
sasl.kerberos.principal=${CLIENT_NAME}/${CLIENT_HOST}:设置SelectDB连接Kafka时使用的Kerberos主体。
创建Routine Load作业,示例如下。
CREATE ROUTINE LOAD db1.job1 on tbl1 PROPERTIES ( "desired_concurrent_number"="1", ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092", "kafka_topic" = "my_topic", "property.security.protocol" = "SASL_PLAINTEXT", "property.sasl.kerberos.service.name" = "kafka", "property.sasl.kerberos.keytab" = "/etc/krb5.keytab", "property.sasl.kerberos.principal" = "id@your.com" );
说明若要使SelectDB访问开启kerberos认证方式的Kafka集群,需要在SelectDB集群所有运行节点上部署Kerberos客户端kinit,并配置krb5.conf,填写KDC服务信息等。
配置
property.sasl.kerberos.keytab
的值需要指定Keytab本地文件的绝对路径,并允许SelectDB进程访问该本地文件。
修改导入作业
修改已经创建的例行导入作业。只能修改处于PAUSED
状态的作业。
语法
ALTER ROUTINE LOAD FOR <job_name>
[job_properties]
FROM <data_source>
[data_source_properties]
参数说明
参数名称 | 参数说明 |
[db.]job_name | 指定要修改的作业名称。 |
tbl_name | 指定需要导入的表的名称。 |
job_properties | 指定需要修改的作业参数。目前仅支持修改的参数如下。
|
data_source | 指定数据源的类型。当前支持: |
data_source_properties | 指定数据源的相关属性。目前仅支持的属性如下。
说明
|
使用示例
将
desired_concurrent_number
修改为1,示例如下。ALTER ROUTINE LOAD FOR db1.label1 PROPERTIES ( "desired_concurrent_number" = "1" );
将
desired_concurrent_number
修改为10,修改partition的offset,修改group id,示例如下。ALTER ROUTINE LOAD FOR db1.label1 PROPERTIES ( "desired_concurrent_number" = "10" ) FROM kafka ( "kafka_partitions" = "0, 1, 2", "kafka_offsets" = "100, 200, 100", "property.group.id" = "new_group" );
暂停导入作业
暂停一个Routine Load作业。被暂停的作业可以通过RESUME
命令重新运行。
语法
PAUSE [ALL] ROUTINE LOAD FOR <job_name>;
参数说明
参数名称 | 参数说明 |
[db.]job_name | 指定要暂停的作业名称。 |
使用示例
暂停名称为test1的Routine Load作业,示例如下。
PAUSE ROUTINE LOAD FOR test1;
暂停所有Routine Load作业,示例如下。
PAUSE ALL ROUTINE LOAD;
恢复导入作业
恢复一个被暂停的Routine Load作业。恢复的作业将继续从之前已消费的offset继续消费。
语法
RESUME [ALL] ROUTINE LOAD FOR <job_name>
参数说明
参数名称 | 参数说明 |
[db.]job_name | 指定要恢复的作业名称。 |
使用示例
恢复名称为test1的Routine Load作业,示例如下。
RESUME ROUTINE LOAD FOR test1;
恢复所有Routine Load作业,示例如下。
RESUME ALL ROUTINE LOAD;
停止导入作业
停止一个Routine Load作业。被停止的作业无法再重新运行。停止导入后,已导入数据不会回滚。
语法
STOP ROUTINE LOAD FOR <job_name>;
参数说明
参数名称 | 参数说明 |
[db.]job_name | 指定要停止的作业名称。 |
使用示例
停止名称为test1的Routine Load作业,示例如下。
STOP ROUTINE LOAD FOR test1;
查看导入作业
Routine Load作业运行状态需要通过SHOW ROUTINE LOAD
命令查看。
语法
SHOW [ALL] ROUTINE LOAD [FOR job_name];
参数说明
参数名称 | 参数说明 |
[db.]job_name | 指定要查看的作业名称。 |
如果导入数据格式不合法,详细的错误信息会记录在ErrorLogUrls中。注意其中包含多个链接,拷贝其中任意一个链接在浏览器中查看即可。
使用示例
展示名称为test1的所有Routine Load作业(包括已停止或取消的作业)。结果为一行或多行。
SHOW ALL ROUTINE LOAD FOR test1;
展示名称为test1的当前正在运行的Routine Load作业。
SHOW ROUTINE LOAD FOR test1;
显示example_db下,所有的Routine Load作业(包括已停止或取消的作业)。结果为一行或多行。
use example_db; SHOW ALL ROUTINE LOAD;
显示example_db下,所有正在运行的Routine Load作业。
use example_db; SHOW ROUTINE LOAD;
显示example_db下,名称为test1的当前正在运行的Routine Load作业。
SHOW ROUTINE LOAD FOR example_db.test1;
显示example_db下,名称为test1的所有Routine Load作业(包括已停止或取消的作业)。结果为一行或多行。
SHOW ALL ROUTINE LOAD FOR example_db.test1;
相关系统配置
相关系统配置参数会影响Routine Load的使用。
max_routine_load_task_concurrent_num
FE配置项,默认为5,可以运行时修改。该参数限制了一个例行导入作业最大的子任务并发数。建议维持默认值。设置过大,可能导致同时并发的任务数过多,占用集群资源。
max_routine_load_task_num_per_be
FE配置项,默认为5,可以运行时修改。该参数限制了每个BE节点最多并发执行的子任务个数。建议维持默认值。如果设置过大,可能导致并发任务数过多,占用集群资源。
max_routine_load_job_num
FE配置项,默认为100,可以运行时修改。该参数限制Routine Load作业的总数,包括NEED_SCHEDULED,RUNNING,PAUSE这些状态。超过后,不能再提交新的作业。
max_consumer_num_per_group
BE 配置项,默认为3。该参数表示一个子任务中最多生成几个Consumer进行数据消费。对于Kafka数据源,一个Consumer可能消费一个或多个Kafka Partition。假设一个任务需要消费6个Kafka Partition,则会生成3个Consumer,每个Consumer消费2个Partition。如果只有2个Partition,则只会生成2个Consumer,每个Consumer消费1个Partition。
max_tolerable_backend_down_num
FE配置项,默认值是0。在满足某些条件下,SelectDB可令PAUSED的任务重新调度,变成RUNNING。该参数为0代表只有所有BE节点是alive状态才允许重新调度。
period_of_auto_resume_min
FE配置项,默认是5分钟。该项意味着当SelectDB重新调度任务时,只会在5分钟这个周期内最多尝试3次。如果3次都失败则锁定当前任务,后续不再进行调度,但可通过人为干预,进行手动恢复。
其他说明
Routine Load作业和ALTER TABLE操作的关系。
Routine Load不会阻塞Schema变更和ROLLUP操作。但Schema变更完成后,列映射关系无法匹配,则会导致作业的错误数据激增,最终导致作业暂停。建议通过在Routine Load作业中显式指定列映射关系,并且通过增加Nullable列或带Default值的列来减少这些问题。
删除表的Partition可能会导致导入数据无法找到对应的Partition,导致作业进入暂停状态。
Routine Load作业和其他导入作业的关系(LOAD,DELETE,INSERT)。
Routine Load和其他LOAD作业以及INSERT操作没有冲突。
当执行DELETE操作时,对应表分区不能有任何正在执行的导入任务。因此在执行DELETE操作前,需要暂停Routine Load作业,并等待已下发的任务全部完成后,方可以执行DELETE。
Routine Load作业和DROP DATABASE或DROP TABLE操作的关系。
当Routine Load对应的database或table被删除后,作业会自动CANCEL。
Kafka类型的Routine Load作业和Kafka topic的关系。
当创建例行导入声明的
kafka_topic
在Kafka集群中不存在时:Kafka集群的Broker设置了
auto.create.topics.enable=true
,则topic会先被自动创建,自动创建的Partition个数是由您的Kafka集群中的Broker配置num.partitions决定的。例行作业会不断读取该topic的数据。Kafka集群的Broker设置了
auto.create.topics.enable=false
,则topic不会被自动创建,例行作业会在没有读取任何数据之前就被暂停,状态置为PAUSED。
所以,如果您希望当Kafka topic不存在的时候,被例行作业自动创建的话,只需要将Kafka集群中的Broker设置
auto.create.topics.enable=true
即可。当环境中存在网段和域名解析的隔离措施,因此需要注意以下问题。
创建Routine Load任务中指定的Broker list必须能够被SelectDB服务访问。
Kafka中如果配置了
advertised.listeners
,advertised.listeners
中的地址必须能够被SelectDB服务访问。
指定消费的Partition和Offset。
SelectDB支持指定Partition,Offset和时间点进行消费。这里说明下对应参数的配置关系。
三个相关参数如下。
kafka_partitions
:指定待消费的Partition列表,如:"0,1,2,3"。kafka_offsets
:指定每个分区的起始offset,必须和kafka_partitions
列表个数对应。如:"1000,1000,2000,2000"property.kafka_default_offset
:指定分区默认的起始Offset。
在创建导入作业时,这三个参数可以有以下五种组合方式。
方式
kafka_partitions
kafka_offsets
property.kafka_default_offset
行为
1
No
No
No
系统会自动查找Topic对应的所有分区并从OFFSET_END开始消费。
2
No
No
Yes
系统会自动查找Topic对应的所有分区并从default offset指定的位置开始消费。
3
Yes
No
No
系统会从指定分区的OFFSET_END开始消费。
4
Yes
Yes
No
系统会从指定分区的指定Offset处开始消费。
5
Yes
No
Yes
系统会从指定分区,default Offset指定的位置开始消费。
STOP和PAUSE的区别。
FE会自动定期清理STOP状态的Routine Load,而PAUSE状态的则可以再次被恢复启用。