电子围栏

电子围栏也被称为电子警报系统,多用于城市规划、交通调度等数据量大、实时性更强的场景。如果您需要实时调度和管理车辆,可以依据本方案的操作步骤,使用Lindorm Ganos时空函数计算和判断车辆实时位置和地理围栏的关系,实现对车辆的精确监控。

背景信息

在车辆管理场景中,车辆电子围栏可以用于监控车辆的行驶范围、判断车辆是否偏离预定路线等,帮助管理人员更好地进行车辆调度和管理。

通常管理人员会预先设置好若干个区域,简称地理围栏,由于该区域不是实时变化的,可以使用外表的形式存储到Lindorm宽表中。而车辆的实时数据(坐标信息)是实时上传的,可以将车辆的实时数据存储到Kafka中,再由Lindorm流引擎通过订阅Kafka中的实时数据,实时计算多个车辆位置和地理围栏之间的关系。

前提条件

  • 已开通Lindorm Ganos时空服务。如何开通,请参见开通时空服务(免费)

  • 已开通Lindorm流引擎。

    说明

    请加入钉钉群(群号:78080001631),申请流引擎开通权限。

  • 已将客户端IP地址添加至Lindorm白名单。如何添加,请参见设置白名单

  • 已安装Java环境,要求安装JDK 1.8及以上版本。

注意事项

本实践将Lindorm客户端部署在ECS实例上,并通过专有网络进行数据访问。

通过专有网络访问Lindorm实例前,需要确保您的Lindorm实例和ECS实例满足以下条件,以保证网络的连通性。

  • 所在地域相同,并建议所在可用区相同(以减少网络延时)。

  • ECS实例与Lindorm实例属于同一专有网络。

步骤一:创建地理围栏表和结果表

在宽表引擎中创建地理围栏表和结果表,分别用于保存地理围栏数据和计算结果。

  1. 通过Lindorm-cli连接并使用宽表引擎

  2. 创建地理围栏表并插入示例数据。

    1. 创建地理围栏表regions,包含三个字段:rID(区域ID)、rName(区域名称)、fence(地理围栏范围)。

      CREATE TABLE regions(rID INT, rName VARCHAR, fence GEOMETRY, PRIMARY KEY(rID));
    2. 插入地理围栏数据,包括3个区域,分别命名为SoHo、ChinatownTribeca。

      INSERT INTO regions(rID, rName, fence) VALUES 
      (1, 'SoHo', ST_GeomFromText('POLYGON((-74.00279525078275 40.72833625216264,-74.00547745979765 40.721929158663244,-74.00125029839018 40.71893680218994,-73.9957785919998 40.72521409075776,-73.9972377137039 40.72557184584898,-74.00279525078275 40.72833625216264))')),
      (2, 'Chinatown', ST_GeomFromText('POLYGON((-73.99712367114876 40.71281582267133,-73.9901070123658 40.71336881907936,-73.99023575839851 40.71452359088633,-73.98976368961189 40.71554823078944,-73.99551434573982 40.717337246783735,-73.99480624255989 40.718491949759304,-73.99652285632942 40.719109951574,-73.99776740131233 40.7168005470334,-73.99903340396736 40.71727219249899,-74.00193018970344 40.71938642421256,-74.00409741458748 40.71688186545551,-74.00051398334358 40.71517415773184,-74.0004281526551 40.714377212470005,-73.99849696216438 40.713450141693166,-73.99748845157478 40.71405192594819,-73.99712367114876 40.71281582267133))')),
      (3, 'Tribeca', ST_GeomFromText('POLYGON((-74.01091641815208 40.72583120006787,-74.01338405044578 40.71436586362705,-74.01370591552757 40.713617702123415,-74.00862044723533 40.711308107057235,-74.00194711120628 40.7194238654018,-74.01091641815208 40.72583120006787))'));
  3. 创建结果表fresult

    CREATE TABLE fresult(uID VARCHAR, rName VARCHAR, rID INT, PRIMARY KEY (uID));

步骤二:写入流数据

您可以通过开源客户端或脚本工具连接Lindorm流引擎并写入测试数据。

以通过开源脚本工具写入为例。

  1. 下载并安装脚本工具。具体操作,请参见通过开源脚本工具连接Lindorm流引擎

  2. 创建名为log_topicTopic。

    ./bin/kafka-topics.sh --bootstrap-server <Lindorm Stream连接地址> --topic log_topic --create

    Lindorm Stream连接地址获取方式如下:image

  3. 将测试数据写入Topic中,使用组合键Ctrl+C可终止写入。

    ./bin/kafka-console-producer.sh --bootstrap-server <Lindorm Stream连接地址> --topic log_topic
    {"uID": "A", "x":"-74.00035", "y": "40.72432"}
    {"uID": "B", "x":"-74.00239", "y": "40.71692"}
    {"uID": "C", "x":"-74.00201", "y": "40.72563"}
    {"uID": "D", "x":"-74.00158", "y": "40.72412"}
    {"uID": "E", "x":"-73.99836", "y": "40.71588"}
    {"uID": "F", "x":"-74.01015", "y": "40.71422"}
    {"uID": "G", "x":"-73.99183", "y": "40.71451"}
    {"uID": "H", "x":"-73.99595", "y": "40.71773"}

    您可以使用./bin/kafka-console-consumer.sh --bootstrap-server <Lindorm Stream连接地址> --topic log_topic --from-beginning命令,查看数据是否成功写入。

步骤三:提交计算任务

使用Flink SQL提交计算任务,读取Topic中的数据,并结合地理围栏数据进行计算。

  1. 安装流引擎客户端。

    1. ECS上执行以下命令,下载流引擎客户端压缩包。

      wget https://hbaseuepublic.oss-cn-beijing.aliyuncs.com/lindorm-sqlline-2.0.2.tar.gz
    2. 执行以下命令,解压压缩包。

      tar zxvf lindorm-sqlline-2.0.2.tar.gz
    3. 进入lindorm-sqlline-2.0.2/bin目录,执行以下命令连接至Lindorm流引擎。

      ./lindorm-sqlline -url <Lindorm Stream SQL地址>
  2. 提交计算任务。

    计算任务具体步骤如下:

    1. 加载ganos函数模块。

    2. Flink Job中创建三张表:数据源表carData、数据维表regions、数据结果表fresult,通过设置连接器参数,分别关联已创建的Topic、地理围栏表regions和结果表fresult

    3. 创建流任务,使用关系函数ST_Contains进行过滤,计算实时数据所属的地理围栏区域,并将计算结果写入结果表fresult

    示例代码如下:

    CREATE FJOB fenceFilter (
      LOAD MODULE ganos;
      -- Enable parallelism
      SET 'parallelism.default'='12';
      -- Create stream table
      CREATE TABLE carData(`uID` STRING, `x` DOUBLE, `y` DOUBLE, `proctime` AS PROCTIME()
        ) WITH (
        'connector'='kafka',
        'topic'='log_topic',
        'scan.startup.mode'='earliest-offset',
        'properties.bootstrap.servers'='<Lindorm Stream连接地址>',
        'format'='json'
      );
      -- Create area table
      CREATE TABLE regions (
        `rID` INT,
        `rName` STRING,
        `fence` GEOMETRY,
        PRIMARY KEY (`rID`) NOT ENFORCED
        ) WITH (
        'connector'='lindorm',
        'seedServer'='<Lindorm宽表HBase Java API访问地址>',
        'userName'='<宽表引擎用户名>',
        'password'='<宽表引擎密码>',
        'tableName'='regions',
        'namespace'='default'
      );
      -- Create result table
      CREATE TABLE fresult (
        `uID` STRING,
        `rName` STRING,
        `rID` INT,
        PRIMARY KEY (`uID`) NOT ENFORCED
      ) WITH (
        'connector'='lindorm',
        'seedServer'='<Lindorm宽表HBase Java API访问地址>',
        'userName'='<宽表引擎用户名>',
        'password'='<宽表引擎密码>',
        'tableName'='fresult',
        'namespace'='default'
      );
      -- Find the area that car located
      INSERT INTO fresult 
        SELECT A.uID, B.rName, B.rID
        FROM carData AS A 
        JOIN regions /*+ OPTIONS('geomHint'='fence:st_contains','geomIndex'='true','cacheTTLMs'='1800000') */ 
        FOR SYSTEM_TIME AS OF A.proctime AS B 
        ON B.fence=ST_MakePoint(A.x,A.y);
    );

    其中,Lindorm宽表HBase Java API访问地址、宽表引擎用户名及密码的获取方式如下:image

步骤四:查看结果

  1. 通过Lindorm-cli连接并使用宽表引擎

  2. 执行以下语句,查看计算结果。

    SELECT * FROM fresult;

    返回结果:

    +-----+-----------+-----+
    | uID |   rName   | rID |
    +-----+-----------+-----+
    | A   | SoHo      | 1   |
    | B   | Chinatown | 2   |
    | C   | SoHo      | 1   |
    | D   | SoHo      | 1   |
    | E   | Chinatown | 2   |
    | F   | Tribeca   | 3   |
    | G   | Chinatown | 2   |
    | H   | Chinatown | 2   |
    +-----+-----------+-----+

    返回结果显示各个车辆当前属于哪个区域,例如车辆A目前位于区域1,即SoHo。