区域统计

Lindorm Ganos时空服务提供了丰富的函数和数据类型,方便您对时空数据进行计算和分析。您可以通过Ganos时空服务预先设定地理围栏,并根据业务需求灵活使用时空函数,结合Lindorm流引擎的实时计算能力,实现基于地理围栏的实时区域统计功能。

背景信息

在很多基于时空位置的场景中,都有根据地理区域范围进行聚合查询的需求,进而形成热力图进行展示,例如:

  • 在互联网出行场景中,需要统计每个行政区(或网格)的车辆分布情况,为平衡运力调度提供依据。

  • LBS客户运营场景中,需要统计每个区域内(AOI)的客户流量,以此来制定优惠券的发放规则。

  • 在车联网场景中,需要统计车辆在某个区域内的聚集情况,可用于大屏密度图展示。

image.png

技术实现

区域统计涉及以下两种数据:

  • 地理围栏:边界是一个闭合的环且不会频繁变更,可以使用Polygon类型将数据保存在Lindorm宽表中。

  • 车辆位置:车辆位置信息是实时上传的流数据,可以实时保存在Kafka Topic中。

Lindorm流引擎将读取Kafka Topic中的实时数据,基于地理围栏进行实时计算,并将计算结果保存在Lindorm宽表中。

image.png

前提条件

注意事项

如果应用部署在ECS实例,通过专有网络访问Lindorm实例前,需要确保Lindorm实例和ECS实例满足以下条件,以保证网络的连通性。

  • 所在地域相同,并建议所在可用区相同(以减少网络延时)。

  • ECS实例与Lindorm实例属于同一专有网络。

步骤一:创建地理围栏表和结果表

宽表引擎中创建地理围栏表和结果表,分别用于保存地理围栏数据和计算结果。

  1. 通过Lindorm-cli连接并使用宽表引擎

  2. 创建地理围栏表并插入示例数据。

    1. 创建地理围栏表regions。

      CREATE TABLE regions(rID INT, rName VARCHAR, fence GEOMETRY, PRIMARY KEY(rID));
    2. 插入地理围栏数据,包括3个区域,分别命名为SoHo、ChinatownTribeca。

      INSERT INTO regions(rID, rName, fence) VALUES 
      (1, 'SoHo', ST_GeomFromText('POLYGON((-74.00279525078275 40.72833625216264,-74.00547745979765 40.721929158663244,-74.00125029839018 40.71893680218994,-73.9957785919998 40.72521409075776,-73.9972377137039 40.72557184584898,-74.00279525078275 40.72833625216264))')),
      (2, 'Chinatown', ST_GeomFromText('POLYGON((-73.99712367114876 40.71281582267133,-73.9901070123658 40.71336881907936,-73.99023575839851 40.71452359088633,-73.98976368961189 40.71554823078944,-73.99551434573982 40.717337246783735,-73.99480624255989 40.718491949759304,-73.99652285632942 40.719109951574,-73.99776740131233 40.7168005470334,-73.99903340396736 40.71727219249899,-74.00193018970344 40.71938642421256,-74.00409741458748 40.71688186545551,-74.00051398334358 40.71517415773184,-74.0004281526551 40.714377212470005,-73.99849696216438 40.713450141693166,-73.99748845157478 40.71405192594819,-73.99712367114876 40.71281582267133))')),
      (3, 'Tribeca', ST_GeomFromText('POLYGON((-74.01091641815208 40.72583120006787,-74.01338405044578 40.71436586362705,-74.01370591552757 40.713617702123415,-74.00862044723533 40.711308107057235,-74.00194711120628 40.7194238654018,-74.01091641815208 40.72583120006787))'));
  3. 创建结果表cresult。

    CREATE TABLE cresult(rName VARCHAR, ws TIMESTAMP, we TIMESTAMP, carCount BIGINT, PRIMARY KEY(rName,ws));

步骤二:写入流数据

Lindorm流引擎完全兼容开源Kafka API,您可以通过Kafka开源客户端或脚本工具连接Lindorm流引擎并写入测试数据。

以通过开源Kafka脚本工具写入为例。

  1. 下载并安装Kafka脚本工具。具体操作,请参见通过开源Kafka脚本工具连接Lindorm流引擎

  2. 创建名为logVehicleTsKafka Topic。

    bin/kafka-topics.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic logVehicleTs --create

    其中,Lindorm Stream Kafka地址为流引擎的Kafka连接地址,仅支持通过专有网络访问。获取方式,请参见查看流引擎连接地址

  3. 将测试数据写入Kafka Topic中,使用组合键Ctrl+C可终止写入。

    bin/kafka-console-producer.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic logVehicleTs
    {"UID": "A", "x":"-74.00035", "y": "40.72432", "tripTime":"2020-01-01 08:00:00"}
    {"UID": "B", "x":"-74.00239", "y": "40.71692", "tripTime":"2020-01-01 08:00:30"}
    {"UID": "C", "x":"-74.00201", "y": "40.72563", "tripTime":"2020-01-01 08:01:00"}
    {"UID": "D", "x":"-74.00158", "y": "40.72412", "tripTime":"2020-01-01 08:01:30"}
    {"UID": "E", "x":"-73.99836", "y": "40.71588", "tripTime":"2020-01-01 08:02:00"}
    {"UID": "F", "x":"-74.01015", "y": "40.71422", "tripTime":"2020-01-01 08:02:30"}
    {"UID": "G", "x":"-73.99183", "y": "40.71451", "tripTime":"2020-01-01 08:03:00"}
    {"UID": "H", "x":"-73.99595", "y": "40.71773", "tripTime":"2020-01-01 08:03:30"}

    您可以使用bin/kafka-console-consumer.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic logVehicleTs --from-beginning命令,查看数据是否成功写入。

步骤三:提交流引擎计算任务

使用Flink SQL提交Lindorm流引擎计算任务,读取Kafka Topic中的数据,并结合地理围栏数据进行计算。

  1. 连接Lindorm流引擎。如何连接,请参见步骤二:安装流引擎客户端

  2. 提交计算任务。

    计算任务构造了一个大小为10分钟的滚动窗口,并每10分钟统计一次窗口内的数据,具体步骤如下:

    1. 加载ganos函数模块。

    2. Flink Job中创建三张表:数据源表logCarWithTs、数据维表regions、数据结果表cresult,通过设置连接器参数,分别关联已创建的Kafka Topic、地理围栏表regions和结果表cresult。

    3. 创建流任务,通过统计函数count、关系函数ST_Contains和时间窗口函数TUMBLE对数据进行过滤,并将计算结果写入结果表cresult。

    示例代码如下:

    CREATE FJOB fenceWs (
      LOAD MODULE ganos;
      CREATE TABLE logCarWithTs(
        `uID` STRING,
        `x` DOUBLE,
        `y` DOUBLE,
        `tripTime` TIMESTAMP(0),
        WATERMARK for `tripTime` AS `tripTime`-INTERVAL '1' MINUTES 
      ) WITH ('connector'='kafka',
              'topic'='logVehicleTs',
              'scan.startup.mode'='earliest-offset',
              'properties.bootstrap.servers'='<Lindorm Stream Kafka地址>',
              'format'='json');
      -- create area table
      CREATE TABLE regions(
        `rID` INT,
        `rName` STRING,
        `fence` GEOMETRY,
        PRIMARY KEY (`rID`) NOT ENFORCED
      ) WITH ('connector'='lindorm',
        'seedServer'='<Lindorm宽表HBase Java API访问地址>',
        'userName'='root',
        'password'='test_password',
        'tableName'='regions',
        'namespace'='default');
      -- create result table
      CREATE TABLE cresult(
        `rName` STRING,
        `ws` TIMESTAMP(0),
        `we` TIMESTAMP(0),
        `carCount` BIGINT,
        PRIMARY KEY (`rName`, `ws`) NOT ENFORCED
      ) WITH ('connector'='lindorm',
        'seedServer'='<Lindorm宽表HBase Java API访问地址>',
        'userName'='root',
        'password'='test_password',
        'tableName'='cresult',
        'namespace'='default');
    -- count cars in each area every 10 minutes
    INSERT INTO cresult
      SELECT regions.rName AS rName,
      window_start AS ws,
      window_end AS we,
      count(*) AS carCount
      FROM TABLE(TUMBLE(TABLE logCarWithTs, DESCRIPTOR(tripTime),INTERVAL '10' MINUTES))  
      JOIN regions 
      ON ST_Contains(regions.fence,ST_MakePoint(x,y)) 
      GROUP BY regions.rName,window_start, window_end;
    );

    其中,Lindorm宽表HBase Java API访问地址的获取方式,请参见查看宽表引擎连接地址

    说明

    计算任务中使用到的函数,请参见关系函数Count函数TUMBLE窗口函数

步骤四:查看结果

  1. 通过Lindorm-cli连接并使用宽表引擎

  2. 执行以下语句,查看区域统计结果。

    SELECT rName, carCount FROM cresult;

    返回结果:

    +-----------+----------+
    |   rName   | carCount |
    +-----------+----------+
    | Chinatown | 3        |
    | SoHo      | 4        |
    | Tribeca   | 3        |
    +-----------+----------+

    carCount为各区域中车辆的数量。