GeoMesa是由locationtech开源的一套地理大数据处理工具套件。本文主要介绍如何通过DLA Ganos查询基于GeoMesa管理的HBase和Cassandra数据库。

通过GeoMesa您可以在NoSQL分布式计算存储系统上进行大规模的地理空间查询和分析。GeoMesa目前支持的NoSQL数据库包括Accumulo、HBase、Google Bigtable和Cassandra等。有关GeoMesa的详细介绍请参见:geomesa官方文档

说明 如果您想通过DLA Ganos查询基于GeoMesa管理的其他数据库,可以参见GeoMesa文档修改相关参数即可。详细代码请参见:DLA Ganos GitHub样例库

Lindorm(HBase)

  1. 初始化SparkSession
    val spark = SparkSession.builder
        .appName("Simple Application")
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .config("spark.sql.crossJoin.enabled", "true")
        .config("spark.kryo.registrator", classOf[GanosSparkKryoRegistrator].getName)
        .getOrCreate()
    import spark.implicits._
    
    //SparkSession加载JTS包用于处理时空数据。
    spark.withJTS 
    val sc = spark.sparkContext
  2. 配置HBase连接参数,并通过SparkSQL加载数据。
    
    //指定HBase连接参数,POINT为Catalog名称。
    val params = Map(
          "hbase.catalog" -> "AIS",
          "hbase.zookeepers" -> "zookeeper地址",
    
    //加载AIS数据源。
    val dataFrame = spark.read
          .format("ganos-geometry")
          .options(params)
          .option("ganos.feature", "testpoints")
          .load()
    
     dataFrame.createOrReplaceTempView("testpoints")
     //创建SQL查询。
     val points = spark.sql("select * from testpoints where st_contains(st_makeBox2d(st_point(38,48), st_point(52,62)),geom)")
     
     //输出Schema与表内容。
     points.printSchema
     points.show
  3. 输出结果
    root
     |-- __fid__: string (nullable = false)
     |-- name: string (nullable = true)
     |-- attr: string (nullable = true)
     |-- dtg: timestamp (nullable = true)
     |-- geom: point (nullable = true)
     
    +-------+-----+-----+-------------------+-------------+
    |__fid__| name| attr|                dtg|         geom|
    +-------+-----+-----+-------------------+-------------+
    |      2|name2|name2|2014-01-03 08:00:01|POINT (42 52)|
    |      3|name3|name3|2014-01-04 08:00:01|POINT (43 53)|
    |      4|name4|name4|2014-01-05 08:00:01|POINT (44 54)|
    |      5|name5|name5|2014-01-06 08:00:01|POINT (45 55)|
    |      6|name6|name6|2014-01-07 08:00:01|POINT (46 56)|
    |      7|name7|name7|2014-01-08 08:00:01|POINT (47 57)|
    |      8|name8|name8|2014-01-09 08:00:01|POINT (48 58)|
    |      9|name9|name9|2014-01-10 08:00:01|POINT (49 59)|
    +-------+-----+-----+-------------------+-------------+
注意 通过DLA Ganos将dataFrame对象写入数据库,然后再通过DLA Ganos将DataFrame回写到HBase数据库前,必须先创建好待写入的数据表,否则将写入失败。
points.write.format("ganos-geometry").options(dsParams).option("ganos.feature", "testpoints").save()

Cassandra

  1. 初始化SparkSession
    val spark = SparkSession.builder
        .appName("Simple Application")
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .config("spark.sql.crossJoin.enabled", "true")
        .config("spark.kryo.registrator", classOf[GanosSparkKryoRegistrator].getName)
        .getOrCreate()
    import spark.implicits._
    
    //SparkSession加载JTS包用于处理时空数据。
    spark.withJTS 
    val sc = spark.sparkContext
  2. 配置连接参数
    val dsParams = Map(
        "cassandra.contact.point" -> "Cassandra连接地址:9042",
        "cassandra.keyspace" -> "ganos",
        "cassandra.catalog"->"test_sft"
      )
    
    //创建DataFrame对应到Cassandra表。
      val dataFrame = spark.read
        .format("ganos-geometry")
        .options(dsParams)
        .option("ganos.feature", "testpoints")
        .load()
      
     dataFrame.createOrReplaceTempView("testpoints")
     //创建SQL查询。
     val points = spark.sql("select * from testpoints where st_contains(st_makeBox2d(st_point(38,48), st_point(52,62)),geom)")
     
     //输出Schema与表内容。
     points.printSchema
     points.show
  3. 输出结果
    root
     |-- __fid__: string (nullable = false)
     |-- name: string (nullable = true)
     |-- attr: string (nullable = true)
     |-- dtg: timestamp (nullable = true)
     |-- geom: point (nullable = true)
     
    +-------+-----+-----+-------------------+-------------+
    |__fid__| name| attr|                dtg|         geom|
    +-------+-----+-----+-------------------+-------------+
    |      2|name2|name2|2014-01-03 08:00:01|POINT (42 52)|
    |      3|name3|name3|2014-01-04 08:00:01|POINT (43 53)|
    |      4|name4|name4|2014-01-05 08:00:01|POINT (44 54)|
    |      5|name5|name5|2014-01-06 08:00:01|POINT (45 55)|
    |      6|name6|name6|2014-01-07 08:00:01|POINT (46 56)|
    |      7|name7|name7|2014-01-08 08:00:01|POINT (47 57)|
    |      8|name8|name8|2014-01-09 08:00:01|POINT (48 58)|
    |      9|name9|name9|2014-01-10 08:00:01|POINT (49 59)|
    +-------+-----+-----+-------------------+-------------+
注意 您通过DLA Ganos将dataFrame对象写入数据库,再通过DLA Ganos将DataFrame回写到HBase数据库前,必须先创建好待写入的表,否则将写入失败。
points.write.format("ganos-geometry").options(dsParams).option("ganos.feature", "testpoints").save()