快速入门

Lindorm Ganos时空服务提供了流引擎接口,适用于车联网、物联网、自动驾驶等场景。您可以通过Lindorm流引擎的实时计算和分析能力处理轨迹点数据,并结合时空函数,实现多种实时轨迹分析需求,例如电子围栏、区域统计、轨迹生成等。

前提条件

  • 已开通Lindorm Ganos时空服务。如何开通,请参见开通时空服务(免费)

  • 已开通流引擎。如何开通,请参见开通流引擎

  • 已安装Java环境,要求安装JDK 1.8及以上版本。

  • 已将客户端IP地址添加至Lindorm白名单。具体操作,请参见设置白名单

操作步骤

本文以车联网的轨迹偏航场景为例:实时分析车辆轨迹点,结合地理围栏计算车辆当前所在位置,方便您判断车辆是否已偏离设定的路线。

创建地理围栏表、结果表

在宽表引擎中创建两张表,分别用于存储地理围栏数据和流引擎计算结果。

  1. 通过Lindorm-cli连接宽表引擎。如何连接,请参见通过Lindorm-cli连接并使用宽表引擎

  2. 创建地理围栏表regions,包含三个列:rID(地理围栏区域ID)、rName(地理围栏区域名)、fence(地理围栏数据)。

    CREATE TABLE regions(
      rID INT,
      rName VARCHAR,
      fence GEOMETRY,
      PRIMARY KEY(rID)
      );
  3. 创建结果表fresult,包含三个列:uID(车辆ID)、rName(区域名称)、rID(区域ID)。

    CREATE TABLE fresult(
      uID VARCHAR, 
      rName VARCHAR, 
      rID INT, 
      PRIMARY KEY (uID)
      );

写入地理围栏数据

向地理围栏表regions写入测试数据,共三个区域:SoHo、ChinatownTribeca。

INSERT INTO regions(rID, rName, fence) VALUES 
(1, 'SoHo', ST_GeomFromText('POLYGON((-74.00279525078275 40.72833625216264,-74.00547745979765 40.721929158663244,-74.00125029839018 40.71893680218994,-73.9957785919998 40.72521409075776,-73.9972377137039 40.72557184584898,-74.00279525078275 40.72833625216264))')),
(2, 'Chinatown', ST_GeomFromText('POLYGON((-73.99712367114876 40.71281582267133,-73.9901070123658 40.71336881907936,-73.99023575839851 40.71452359088633,-73.98976368961189 40.71554823078944,-73.99551434573982 40.717337246783735,-73.99480624255989 40.718491949759304,-73.99652285632942 40.719109951574,-73.99776740131233 40.7168005470334,-73.99903340396736 40.71727219249899,-74.00193018970344 40.71938642421256,-74.00409741458748 40.71688186545551,-74.00051398334358 40.71517415773184,-74.0004281526551 40.714377212470005,-73.99849696216438 40.713450141693166,-73.99748845157478 40.71405192594819,-73.99712367114876 40.71281582267133))')),
(3, 'Tribeca', ST_GeomFromText('POLYGON((-74.01091641815208 40.72583120006787,-74.01338405044578 40.71436586362705,-74.01370591552757 40.713617702123415,-74.00862044723533 40.711308107057235,-74.00194711120628 40.7194238654018,-74.01091641815208 40.72583120006787))'));

ST_GeomFromText函数的详细说明,请参见ST_GeomFromText

接入流数据

连接Lindorm流引擎,并将示例数据写入Kafka Topic中。本示例将通过开源Kafka脚本工具连接Lindorm流引擎。

  1. 连接Lindorm流引擎,并创建名为logVehicleKafka Topic。具体操作,请参见通过开源Kafka脚本工具连接Lindorm流引擎

  2. 执行以下命令,将示例数据写入到已创建的Kafka Topic中。示例数据获取:testcar.txt

    ./bin/kafka-console-producer.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic logVehicle < testcar.txt
    说明
    • 您需要下载示例数据testcar.txt,并将其上传至开源Kafka脚本工具的根目录下。

    • 您可以使用./bin/kafka-console-consumer.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic logVehicle --from-beginning命令,查看数据是否成功写入。

提交流引擎计算任务

使用Flink SQL提交Lindorm流引擎计算任务,读取开源Kafka topic中的数据并结合宽表数据做计算。

  1. 连接到Lindorm流引擎,详情请参见使用流引擎

  2. 创建地理围栏计算任务。

    下面的计算任务将流入Kafka的点与Lindorm表做地理围栏过滤计算,输出每个点所在的行政区划。任务流程如下:

    1. 加载ganos函数模块。

    2. Flink Job中创建三张表:数据源表carData、数据维表regions、结果表fresult,通过设置连接器参数,分别关联已创建的Kafka Topic、地理围栏表regions和结果表fresult。

    3. 创建流任务,使用geomHint搭配st_contains函数过滤数据并写入结果表,此处使用了索引加速。

    CREATE FJOB fenceFilter (
      LOAD MODULE ganos;
      -- enable parallelism
      SET 'parallelism.default'='8';
      -- create stream table
      CREATE TABLE carData(`uID` STRING, `x` DOUBLE, `y` DOUBLE, `proctime` AS PROCTIME()
        ) WITH (
        'connector'='kafka',
        'topic'='logVehicle',
        'scan.startup.mode'='earliest-offset',
        'properties.bootstrap.servers'='<Lindorm Stream Kafka地址>',
        'format'='json'
      );
      -- create area table
      CREATE TABLE regions (
        `rID` INT,
        `rName` STRING,
        `fence` GEOMETRY,
        PRIMARY KEY (`rID`) NOT ENFORCED
        ) WITH (
        'connector'='lindorm',
        'seedServer'='<Lindorm宽表HBase Java API访问地址>',
        'userName'='<Lindorm宽表引擎的用户名>',
        'password'='<Lindorm宽表引擎的密码>',
        'tableName'='regions',
        'namespace'='<地理围栏表regions所在的数据库>'
      );
      --create result table
      CREATE TABLE fresult (
        `uID` STRING,
        `rName` STRING,
        `rID` INT,
        PRIMARY KEY (`uID`) NOT ENFORCED
      ) WITH (
        'connector'='lindorm',
        'seedServer'='<Lindorm宽表HBase Java API访问地址>',
        'userName'='<Lindorm宽表引擎的用户名>',
        'password'='<Lindorm宽表引擎的密码>',
        'tableName'='fresult',
        'namespace'='<结果表fresult所在的数据库>'
      );
      -- find the area that car located
      INSERT INTO fresult 
        SELECT A.uID, B.rName, B.rID
        FROM carData AS A 
        JOIN regions /*+ OPTIONS('geomHint'='fence:st_contains','geomIndex'='true','cacheTTLMs'='1800000') */ 
        FOR SYSTEM_TIME AS OF A.proctime AS B 
        ON B.fence=ST_MakePoint(A.x,A.y);
    );
    说明

查看结果

宽表引擎中,通过SQL语句查看计算结果。

  1. 通过Lindorm-cli连接并使用宽表引擎

  2. 查询计算结果。

    SELECT * FROM fresult;

    返回结果:

    +-----+---------------+-----+
    | uID |     rName     | rID |
    +-----+---------------+-----+
    | A   | Soho          | 1   |
    | B   | Chinatown     | 2   |
    | C   | Soho          | 1   |
    | D   | Soho          | 1   |
    | E   | Chinatown     | 2   |
    | F   | Tribeca       | 3   |
    | G   | Chinatown     | 2   |
    | H   | Chinatown     | 2   |
    +-----+---------------+-----+

    返回结果为不同车辆的所在位置,例如uIDA的车辆当前位于名称Soho的位置。