GeoMesa是由locationtech开源的一套地理大数据处理工具套件。本文主要介绍如何通过DLA Ganos查询基于GeoMesa管理的HBase和Cassandra数据库。
通过GeoMesa您可以在NoSQL分布式计算存储系统上进行大规模的地理空间查询和分析。GeoMesa目前支持的NoSQL数据库包括Accumulo、HBase、Google Bigtable和Cassandra等。有关GeoMesa的详细介绍请参见:geomesa官方文档。
说明 如果您想通过DLA Ganos查询基于GeoMesa管理的其他数据库,可以参见GeoMesa文档修改相关参数即可。详细代码请参见:DLA Ganos GitHub样例库。
Lindorm(HBase)
- 初始化SparkSession
val spark = SparkSession.builder .appName("Simple Application") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.crossJoin.enabled", "true") .config("spark.kryo.registrator", classOf[GanosSparkKryoRegistrator].getName) .getOrCreate() import spark.implicits._ //SparkSession加载JTS包用于处理时空数据。 spark.withJTS val sc = spark.sparkContext
- 配置HBase连接参数,并通过SparkSQL加载数据。
//指定HBase连接参数,POINT为Catalog名称。 val params = Map( "hbase.catalog" -> "AIS", "hbase.zookeepers" -> "zookeeper地址", //加载AIS数据源。 val dataFrame = spark.read .format("ganos-geometry") .options(params) .option("ganos.feature", "testpoints") .load() dataFrame.createOrReplaceTempView("testpoints") //创建SQL查询。 val points = spark.sql("select * from testpoints where st_contains(st_makeBox2d(st_point(38,48), st_point(52,62)),geom)") //输出Schema与表内容。 points.printSchema points.show
- 输出结果
root |-- __fid__: string (nullable = false) |-- name: string (nullable = true) |-- attr: string (nullable = true) |-- dtg: timestamp (nullable = true) |-- geom: point (nullable = true) +-------+-----+-----+-------------------+-------------+ |__fid__| name| attr| dtg| geom| +-------+-----+-----+-------------------+-------------+ | 2|name2|name2|2014-01-03 08:00:01|POINT (42 52)| | 3|name3|name3|2014-01-04 08:00:01|POINT (43 53)| | 4|name4|name4|2014-01-05 08:00:01|POINT (44 54)| | 5|name5|name5|2014-01-06 08:00:01|POINT (45 55)| | 6|name6|name6|2014-01-07 08:00:01|POINT (46 56)| | 7|name7|name7|2014-01-08 08:00:01|POINT (47 57)| | 8|name8|name8|2014-01-09 08:00:01|POINT (48 58)| | 9|name9|name9|2014-01-10 08:00:01|POINT (49 59)| +-------+-----+-----+-------------------+-------------+
注意 通过DLA Ganos将dataFrame对象写入数据库,然后再通过DLA Ganos将DataFrame回写到HBase数据库前,必须先创建好待写入的数据表,否则将写入失败。
points.write.format("ganos-geometry").options(dsParams).option("ganos.feature", "testpoints").save()
Cassandra
- 初始化SparkSession
val spark = SparkSession.builder .appName("Simple Application") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.crossJoin.enabled", "true") .config("spark.kryo.registrator", classOf[GanosSparkKryoRegistrator].getName) .getOrCreate() import spark.implicits._ //SparkSession加载JTS包用于处理时空数据。 spark.withJTS val sc = spark.sparkContext
- 配置连接参数
val dsParams = Map( "cassandra.contact.point" -> "Cassandra连接地址:9042", "cassandra.keyspace" -> "ganos", "cassandra.catalog"->"test_sft" ) //创建DataFrame对应到Cassandra表。 val dataFrame = spark.read .format("ganos-geometry") .options(dsParams) .option("ganos.feature", "testpoints") .load() dataFrame.createOrReplaceTempView("testpoints") //创建SQL查询。 val points = spark.sql("select * from testpoints where st_contains(st_makeBox2d(st_point(38,48), st_point(52,62)),geom)") //输出Schema与表内容。 points.printSchema points.show
- 输出结果
root |-- __fid__: string (nullable = false) |-- name: string (nullable = true) |-- attr: string (nullable = true) |-- dtg: timestamp (nullable = true) |-- geom: point (nullable = true) +-------+-----+-----+-------------------+-------------+ |__fid__| name| attr| dtg| geom| +-------+-----+-----+-------------------+-------------+ | 2|name2|name2|2014-01-03 08:00:01|POINT (42 52)| | 3|name3|name3|2014-01-04 08:00:01|POINT (43 53)| | 4|name4|name4|2014-01-05 08:00:01|POINT (44 54)| | 5|name5|name5|2014-01-06 08:00:01|POINT (45 55)| | 6|name6|name6|2014-01-07 08:00:01|POINT (46 56)| | 7|name7|name7|2014-01-08 08:00:01|POINT (47 57)| | 8|name8|name8|2014-01-09 08:00:01|POINT (48 58)| | 9|name9|name9|2014-01-10 08:00:01|POINT (49 59)| +-------+-----+-----+-------------------+-------------+
注意 您通过DLA Ganos将dataFrame对象写入数据库,再通过DLA Ganos将DataFrame回写到HBase数据库前,必须先创建好待写入的表,否则将写入失败。
points.write.format("ganos-geometry").options(dsParams).option("ganos.feature", "testpoints").save()