Lindorm Ganos时空服务提供了流引擎接口,适用于车联网、物联网、自动驾驶等场景。您可以通过Lindorm流引擎的实时计算和分析能力处理轨迹点数据,并结合时空函数,实现多种实时轨迹分析需求,例如电子围栏、区域统计、轨迹生成等。
前提条件
已开通Lindorm Ganos时空服务。如何开通,请参见开通时空服务(免费)。
已开通流引擎。如何开通,请参见开通流引擎。
已安装Java环境,要求安装JDK 1.8及以上版本。
已将客户端IP地址添加至Lindorm白名单。具体操作,请参见设置白名单。
操作步骤
本文以车联网的轨迹偏航场景为例:实时分析车辆轨迹点,结合地理围栏计算车辆当前所在位置,方便您判断车辆是否已偏离设定的路线。
创建地理围栏表、结果表
在宽表引擎中创建两张表,分别用于存储地理围栏数据和流引擎计算结果。
通过Lindorm-cli连接宽表引擎。如何连接,请参见通过Lindorm-cli连接并使用宽表引擎。
创建地理围栏表regions,包含三个列:rID(地理围栏区域ID)、rName(地理围栏区域名)、fence(地理围栏数据)。
CREATE TABLE regions( rID INT, rName VARCHAR, fence GEOMETRY, PRIMARY KEY(rID) );
创建结果表fresult,包含三个列:uID(车辆ID)、rName(区域名称)、rID(区域ID)。
CREATE TABLE fresult( uID VARCHAR, rName VARCHAR, rID INT, PRIMARY KEY (uID) );
写入地理围栏数据
向地理围栏表regions写入测试数据,共三个区域:SoHo、Chinatown和Tribeca。
INSERT INTO regions(rID, rName, fence) VALUES
(1, 'SoHo', ST_GeomFromText('POLYGON((-74.00279525078275 40.72833625216264,-74.00547745979765 40.721929158663244,-74.00125029839018 40.71893680218994,-73.9957785919998 40.72521409075776,-73.9972377137039 40.72557184584898,-74.00279525078275 40.72833625216264))')),
(2, 'Chinatown', ST_GeomFromText('POLYGON((-73.99712367114876 40.71281582267133,-73.9901070123658 40.71336881907936,-73.99023575839851 40.71452359088633,-73.98976368961189 40.71554823078944,-73.99551434573982 40.717337246783735,-73.99480624255989 40.718491949759304,-73.99652285632942 40.719109951574,-73.99776740131233 40.7168005470334,-73.99903340396736 40.71727219249899,-74.00193018970344 40.71938642421256,-74.00409741458748 40.71688186545551,-74.00051398334358 40.71517415773184,-74.0004281526551 40.714377212470005,-73.99849696216438 40.713450141693166,-73.99748845157478 40.71405192594819,-73.99712367114876 40.71281582267133))')),
(3, 'Tribeca', ST_GeomFromText('POLYGON((-74.01091641815208 40.72583120006787,-74.01338405044578 40.71436586362705,-74.01370591552757 40.713617702123415,-74.00862044723533 40.711308107057235,-74.00194711120628 40.7194238654018,-74.01091641815208 40.72583120006787))'));
ST_GeomFromText函数的详细说明,请参见ST_GeomFromText。
接入流数据
连接Lindorm流引擎,并将示例数据写入Kafka Topic中。本示例将通过开源Kafka脚本工具连接Lindorm流引擎。
连接Lindorm流引擎,并创建名为logVehicle的Kafka Topic。具体操作,请参见通过开源Kafka脚本工具连接Lindorm流引擎。
执行以下命令,将示例数据写入到已创建的Kafka Topic中。示例数据获取:testcar.txt。
./bin/kafka-console-producer.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic logVehicle < testcar.txt
说明您需要下载示例数据testcar.txt,并将其上传至开源Kafka脚本工具的根目录下。
您可以使用
./bin/kafka-console-consumer.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic logVehicle --from-beginning
命令,查看数据是否成功写入。
提交流引擎计算任务
使用Flink SQL提交Lindorm流引擎计算任务,读取开源Kafka topic中的数据并结合宽表数据做计算。
连接到Lindorm流引擎,详情请参见使用流引擎。
创建地理围栏计算任务。
下面的计算任务将流入Kafka的点与Lindorm表做地理围栏过滤计算,输出每个点所在的行政区划。任务流程如下:
加载
ganos
函数模块。在Flink Job中创建三张表:数据源表carData、数据维表regions、结果表fresult,通过设置连接器参数,分别关联已创建的Kafka Topic、地理围栏表regions和结果表fresult。
创建流任务,使用
geomHint
搭配st_contains
函数过滤数据并写入结果表,此处使用了索引加速。
CREATE FJOB fenceFilter ( LOAD MODULE ganos; -- enable parallelism SET 'parallelism.default'='8'; -- create stream table CREATE TABLE carData(`uID` STRING, `x` DOUBLE, `y` DOUBLE, `proctime` AS PROCTIME() ) WITH ( 'connector'='kafka', 'topic'='logVehicle', 'scan.startup.mode'='earliest-offset', 'properties.bootstrap.servers'='<Lindorm Stream Kafka地址>', 'format'='json' ); -- create area table CREATE TABLE regions ( `rID` INT, `rName` STRING, `fence` GEOMETRY, PRIMARY KEY (`rID`) NOT ENFORCED ) WITH ( 'connector'='lindorm', 'seedServer'='<Lindorm宽表HBase Java API访问地址>', 'userName'='<Lindorm宽表引擎的用户名>', 'password'='<Lindorm宽表引擎的密码>', 'tableName'='regions', 'namespace'='<地理围栏表regions所在的数据库>' ); --create result table CREATE TABLE fresult ( `uID` STRING, `rName` STRING, `rID` INT, PRIMARY KEY (`uID`) NOT ENFORCED ) WITH ( 'connector'='lindorm', 'seedServer'='<Lindorm宽表HBase Java API访问地址>', 'userName'='<Lindorm宽表引擎的用户名>', 'password'='<Lindorm宽表引擎的密码>', 'tableName'='fresult', 'namespace'='<结果表fresult所在的数据库>' ); -- find the area that car located INSERT INTO fresult SELECT A.uID, B.rName, B.rID FROM carData AS A JOIN regions /*+ OPTIONS('geomHint'='fence:st_contains','geomIndex'='true','cacheTTLMs'='1800000') */ FOR SYSTEM_TIME AS OF A.proctime AS B ON B.fence=ST_MakePoint(A.x,A.y); );
说明ST_MakePoint函数的详细说明,请参见ST_MakePoint。
查看结果
在宽表引擎中,通过SQL语句查看计算结果。
查询计算结果。
SELECT * FROM fresult;
返回结果:
+-----+---------------+-----+ | uID | rName | rID | +-----+---------------+-----+ | A | Soho | 1 | | B | Chinatown | 2 | | C | Soho | 1 | | D | Soho | 1 | | E | Chinatown | 2 | | F | Tribeca | 3 | | G | Chinatown | 2 | | H | Chinatown | 2 | +-----+---------------+-----+
返回结果为不同车辆的所在位置,例如uID为A的车辆当前位于名称Soho的位置。