在车联网场景中,为了方便分析和预测,车辆的实时位置点会被聚合为一条轨迹线,每产生一个新的位置点都会被追加至轨迹线中。因为位置点是实时上传的且数据量较大,所以数据库在处理追加数据的操作时通常会消耗大量的IO。为解决这一问题,Lindorm Ganos时空服务结合Lindorm流引擎的实时计算能力,提供了轨迹生成方案。本文介绍轨迹生成的技术实现和实际操作步骤。
背景信息
在车联网场景中,车辆会持续地上传位置点(GPS Point)数据,这些位置点构成了一条轨迹线(Trajectory/LineString)。基于轨迹线,可以开展多种高效的分析及预测工作。例如回溯车辆的行驶轨迹、基于多条轨迹统计常行驶路线、计算车辆轨迹之间的相似性等等。
以下是在车联网场景中的一个示例:每小时将两辆车的位置点拼接为轨迹。
技术实现
位置点聚合成轨迹线可以看作是追加数据的过程,每次上传一个新的轨迹点都要追加到已有的轨迹线中。
这一过程在数据库层面表现为频繁的读写聚合操作:先读取数据库中已有的轨迹线数据,再在内存中将新的轨迹点数据和已有的轨迹线聚合为新的轨迹线,最后将新的轨迹线写入至数据库中。这个过程非常消耗IO,如果数据量过大,则可能会影响查询响应速度。
为解决这一问题,Lindorm Ganos时空服务基于流引擎提供了轨迹生成方案。Lindorm流引擎读取车辆的实时位置点数据,定期(每小时、每天)将位置点拼接为轨迹,再将聚合后的轨迹线数据写入至数据库,不仅保证了对车辆行驶轨迹数据的实时处理和分析能力,同时也减轻了数据库在处理高频率追加写入操作时的IO压力。
轨迹生成共涉及一种源数据:车辆的实时位置数据。
车辆位置信息是实时上传的流数据,可以实时保存在Kafka Topic中。Lindorm流引擎将读取Kafka Topic中的位置数据,根据计算任务进行实时计算,并将计算结果保存在Lindorm宽表中。
前提条件
已将客户端IP地址添加至Lindorm白名单。如何添加,请参见设置白名单。
已开通Lindorm Ganos时空服务。如何开通,请参见开通时空服务(免费)。
已开通Lindorm流引擎。如何开通,请参见开通流引擎。
注意事项
如果应用部署在ECS实例,通过专有网络访问Lindorm实例前,需要确保Lindorm实例和ECS实例满足以下条件,以保证网络的连通性。
所在地域相同,并建议所在可用区相同(以减少网络延时)。
ECS实例与Lindorm实例属于同一专有网络。
步骤一:创建结果表
在宽表引擎中创建结果表,用于保存和查询计算结果。
创建结果表resultAgg1,包含五个字段:cid(车辆ID)、stt(窗口开始时间)、edd(窗口结束时间)、line(轨迹线)、timeseries(时间戳)。
CREATE TABLE resultAgg1(cid INT, stt TIMESTAMP, edd TIMESTAMP, line VARCHAR, timeseries VARCHAR, PRIMARY KEY (cid,stt));
步骤二:写入流数据
Lindorm流引擎完全兼容开源Kafka API,您可以通过Kafka开源客户端或脚本工具连接Lindorm流引擎并写入测试数据。
以通过开源Kafka脚本工具写入数据为例。
下载并安装Kafka脚本工具。具体操作,请参见通过开源Kafka脚本工具连接Lindorm流引擎。
执行以下命令,创建名为tdrive的Kafka Topic。
bin/kafka-topics.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic tdrive --create
其中,Lindorm Stream Kafka地址为流引擎的Kafka连接地址,仅支持通过专有网络访问。获取方式,请参见查看连接地址。
执行以下命令,将示例数据
car1.txt
(测试用位置点数据)写入Kafka Topic中。示例数据获取:car1.txt。./bin/kafka-console-producer.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic tdrive < car1.txt
步骤三:提交流引擎计算任务
使用Flink SQL提交Lindorm流引擎计算任务,读取Kafka Topic中的数据进行计算。
连接Lindorm流引擎。如何连接,请参见步骤二:安装流引擎客户端。
提交计算任务。
计算任务将写入Kafka Topic的点根据指定的时间窗口和顺序拼接为轨迹,具体步骤如下:
加载
ganos
函数模块。在Flink Job中创建两张表:数据源表tdrive和数据结果表resultAgg1,通过设置连接器参数,分别关联已创建的Kafka Topic和结果表resultAgg1。
创建流任务,使用
ST_MakeLine_Ts_Agg
函数将点拼接为轨迹,并按照输入的时间戳(Timestamp)列对数据进行排序,使用LISTAGG
函数拼接时间戳,最后将计算结果写入结果表resultAgg1。
CREATE FJOB lineAgg1 ( LOAD MODULE ganos; -- create stream table CREATE TABLE tdrive( cid INT, ts TIMESTAMP(0), lng DOUBLE, lat DOUBLE, WATERMARK FOR ts AS ts-INTERVAL '10' MINUTES ) WITH ( 'connector'='kafka', 'topic'='tdrive', 'scan.startup.mode'='earliest-offset', 'properties.bootstrap.servers'='<Lindorm Stream Kafka地址>', 'format'='json', 'json.ignore-parse-errors'='true' ); --create result table CREATE TABLE resultAgg1( cid INT, stt TIMESTAMP(0), edd TIMESTAMP(0), line STRING, PRIMARY KEY(cid,stt) NOT ENFORCED ) WITH ( 'connector'='lindorm', 'seedServer'='<Lindorm宽表HBase Java API访问地址>', 'userName'='<Lindorm宽表引擎的用户名>', 'password'='<Lindorm宽表引擎的密码>', 'tableName'='resultAgg1', 'namespace'='<结果表resultAgg1所在的数据库>'); INSERT INTO resultAgg1 SELECT cid, window_start AS stt, window_end AS edd, ST_AsText(ST_MakeLine_Ts_Agg(ST_MakePoint(lng,lat),ts)) AS line, LISTAGG(CAST(ts AS VARCHAR)) AS timeseries FROM TABLE(TUMBLE(TABLE tdrive, DESCRIPTOR(ts), INTERVAL '1' HOUR)) WHERE lng IS NOT NULL AND lat IS NOT NULL AND ST_IsEmpty(ST_MakePoint(lng,lat))=false GROUP BY window_start,window_end,cid; );
说明Lindorm宽表HBase Java API访问地址的获取方式,请参见查看连接地址。
计算任务中使用到的函数,请参见ST_AsText、ST_MakePoint、ST_MakeLine_Ts_Agg和TUMBLE窗口函数。
步骤四:查看结果
执行以下语句,查看区域统计结果。
SELECT * FROM resultAgg1 LIMIT 5;
返回结果:
+-----+-------------------------------+-------------------------------+--------------------------------+--------------------------------+ | cid | stt | edd | line | timeseries | +-----+-------------------------------+-------------------------------+--------------------------------+--------------------------------+ | 1 | 2008-02-02 15:00:00 +0000 UTC | 2008-02-02 16:00:00 +0000 UTC | LINESTRING (116.51172 | 2008-02-02 | | | | | 39.92123, 116.51135 39.93883, | 07:36:08.000,2008-02-02 | | | | | 116.51627 39.91034) | 07:46:08.000,2008-02-02 | | | | | | 07:56:08.000 | | 1 | 2008-02-02 16:00:00 +0000 UTC | 2008-02-02 17:00:00 +0000 UTC | LINESTRING (116.47186 | 2008-02-02 | | | | | 39.91248, 116.47217 39.92498, | 08:06:08.000,2008-02-02 | | | | | 116.47179 39.90718, 116.45617 | 08:16:08.000,2008-02-02 | | | | | 39.90531) | 08:26:08.000,2008-02-02 | | | | | | 08:36:08.000 | | 1 | 2008-02-02 17:00:00 +0000 UTC | 2008-02-02 18:00:00 +0000 UTC | LINESTRING (116.47191 | 2008-02-02 | | | | | 39.90577, 116.50661 39.9145) | 09:00:24.000,2008-02-02 | | | | | | 09:10:24.000 | | 1 | 2008-02-02 20:00:00 +0000 UTC | 2008-02-02 21:00:00 +0000 UTC | LINESTRING (116.49625 39.9146, | 2008-02-02 | | | | | 116.50962 39.91071, 116.52231 | 12:30:34.000,2008-02-02 | | | | | 39.91588) | 12:40:33.000,2008-02-02 | | | | | | 12:50:33.000 | | 1 | 2008-02-02 21:00:00 +0000 UTC | 2008-02-02 22:00:00 +0000 UTC | LINESTRING (116.56444 | 2008-02-02 | | | | | 39.91445, 116.59512 39.90798, | 13:00:33.000,2008-02-02 | | | | | 116.65522 39.8622, 116.69164 | 13:10:33.000,2008-02-02 | | | | | 39.85165, 116.69167 39.85166) | 13:30:33.000,2008-02-02 | | | | | | 13:40:33.000,2008-02-02 | | | | | | 13:50:33.000 | +-----+-------------------------------+-------------------------------+--------------------------------+--------------------------------+
其中line列为聚合得到的轨迹线,timeseries为位置点对应的时间戳。