使用Flink写入时序引擎

Flink可以处理实时数据流,并将处理结果写入Lindorm时序引擎,以实现实时数据监控等场景。本文介绍如何将Flink上实时的数据处理结果写入到时序引擎。

前提条件

  • 已开通实时计算Flink版或者已有自建Flink。实时计算Flink版的开通,请参见开通实时计算Flink

    说明

    实时计算Flink版需要VVR 4.0.13及以上版本,VVR 4.0.13版本是基于Apache Flink 1.13。

  • 为了保证网络的连通性,确保云原生多模数据库 Lindorm实例和实时计算Flink使用相同的专有网络。

    说明

    实时计算Flink版默认不具备访问公网的能力,如需通过公网将数据写入时序引擎,请参见Flink全托管集群如何访问公网

  • 已开通时序引擎。

  • Lindorm时序引擎为3.4.7及以上版本,如何查看或升级时序引擎版本,请参见时序引擎版本说明升级小版本

  • 已将FlinkIP地址添加到Lindorm白名单。如果您使用的是实时计算Flink版,查看IP地址的操作,请参见如何设置白名单。添加Lindorm白名单的操作,请参见设置白名单

背景信息

时序引擎Sink连接器用于连接其他系统与Lindorm时序引擎,负责从各种数据源接收数据并写入到时序引擎。实时计算Flink版通过Flink SQL定义源表、维表和结果表,通过定义时序引擎Sink连接器的参数,将结果表映射到Lindorm时序表,从而将Flink处理后的结果数据写入Lindorm时序引擎。使用时序引擎SINK插件,需要先获取时序引擎SINK插件,再将JAR包上传至实时计算Flink版控制台,上传方法请参见JAR作业开发

语法

在实时计算Flink上创建结果表,并配置时序引擎Sink连接器参数,实现Flink结果表到Lindorm时序表的映射。

CREATE TEMPORARY TABLE tsdb_sink(
  `timestamp` BIGINT,
  tag_<tagname> VARCHAR,
  field_<fieldname1> DOUBLE,
  field_<fieldname2> VARCHAR,
  field_<fieldname3> BIGINT,
  field_<fieldname4> BOOLEAN
  -- table VARCHAR(可选字段)
)
WITH (
    'connector' = 'lindormtsdb',
    'url'='<lindormTSDBHttpUrl>',
    'table'='<yourTableName>',
    'defaultDatabase'='<yourDatabaseName>',
    'schemaPolicy'='<schemaPolicy>',
    'sink.parallelism'='<sinkParallelism>'
    'ignoreErrorData'='<ignoreErrorData>',
    'maxRetries'='<maxRetries>',
    'batchSize'='<batchSize>',
    'connectTimeoutMs'='<connectTimeoutMs>',
    'sync'='<sync>',
    'debug'='<debug>'
);

参数说明

结果表结构参数说明

字段名

数据类型

是否必选

说明

timestamp

BIGINT

字段名必须为timestamp,且字段类型必须为BIGINT。

单位为毫秒(ms)。

说明
  • timestamp为保留字,请用反引号(``)将timestamp括起来。

  • 字段值为13位时间戳,如果是10位时间戳,写入会自动转换为13位。

tag_tagname

VARCHAR

指定时序数据的标签(Tag)。tag_表示前缀,不能省略和修改。tagname表示时序数据标签名称。

示例:tag_deviceid。

说明

tag_tagname可以为一列或者多列。

field_fieldname

DOUBLE、VARCHAR、BIGINT、BOOLEAN

指定时序数据量测值(Field)。field_表示前缀,不能省略和修改。fieldname表示时序数据量测值名称。

示例:field_humidity。

说明

field_fieldname可以为一列或者多列。

table

VARCHAR

指定时序数据表。

  • 如果写入一张时序表,建议在WITH参数中配置。

  • 如果同时写入多张时序表,可以在表结构的table字段中配置。

WITH参数说明

参数

是否必选

说明

connector

固定值lindormtsdb,指定时序引擎SINK插件。

url

Lindorm时序引擎的HTTP连接地址,获取方法请参见查看连接地址

table

指定时序数据表。

  • 如果写入一张时序表,建议在WITH参数中配置。

  • 如果同时写入多张时序表,可以在表结构的table字段中配置。

username

条件必选

连接时序引擎的用户名和密码。

如已开启用户认证与权限校验,则必须输入用户名和密码。否则无需输入。

说明

时序引擎默认未开启用户认证与权限校验。为了数据安全,建议您开启时序引擎的用户认证与权限校验。

password

条件必选

defaultDatabase

写入数据的数据库。默认值为default。

schemaPolicy

Schema约束策略。

  • Strong:强约束,默认值。时序引擎会严格依据预先定义的表结构对写入数据的表名、字段名、类型进行校验。选择Strong,需提前手动建表,否则数据写入会失败。

  • Weak:弱约束。写入数据的表不存在时,不会报错,而是会自动创建对应的表。

  • None:无约束。写入数据的表不存在时,不会报错,也不会自动建表。如果不手动建表,不影响数据写入,但无法直接通过SQL查询写入的数据。

说明

更多信息请参见Schema约束

sink.parallelism

写入并发度,当写入数据量较大时可适当增加并发度,默认值为1 。

ignoreErrorData

是否忽略写入错误。

  • false:不忽略,默认值。如果遇到错误,就跳出写入。

  • true:忽略。如果遇到错误就忽略错误,继续写入。

maxRetries

写入时遇到服务端内部错误或者网络错误时最大重试次数,默认值为3。

batchSize

批处理大小,即每次写入数据库的数据量,默认值为500个数据点。

connectTimeoutMs

HTTP连接超时时间,默认值为90000。单位为毫秒(ms)。

debug

是否开启debug模式,用来打印详细数据点日志。

  • false:不开启,默认值。

  • true:开启。

sync

是否同步写入,建议使用false。

  • false:异步写入,默认值,写入效率高。

  • true:同步写入。

使用示例

datagen_source随机数据生成器为例,将生成的数据写入Lindorm时序表mytable中。示例代码如下:

CREATE TEMPORARY TABLE datagen_source (
  id INTEGER,
  score DOUBLE,
  name STRING
)
WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE tsdb_sink(
  tag_tagk VARCHAR,
  field_score DOUBLE,
  field_name STRING,
  `timestamp` BIGINT
)
WITH (
    'connector' = 'lindormtsdb',
    'url'='http://ld-bp159jt4eivt3****-proxy-tsdb.lindorm.rds.aliyuncs.com:8242',
    'table'= 'mytable',
    'schemaPolicy'='weak'
);

INSERT INTO tsdb_sink
SELECT
  CAST(id as STRING) as tag_tagk,
  score as field_score,
  name as field_name,
  UNIX_TIMESTAMP(now()) * 1000  as `timestamp`
FROM datagen_source;