本实例展示如何将Hive数据加载到DLA Ganos进行分析。
Hive是Hadoop生态系统中的一个被广泛使用的数据仓库工具,主要用来进行Hadoop中的大规模数据的提取、转化、加载、查询和分析等操作。Hive数据仓库工具能将存储在HDFS系统中的结构化的数据文件映射为一张数据库表,并提供SQL查询功能,能将SQL语句转变成Map/Reduce任务来执行。
操作步骤
- 初始化Spark。
val spark: SparkSession = SparkSession.builder() .config("hive.metastore.uris", hiveMetastoreUris) .config("hive.sql.warehouse.dir", hiveWarehouseDir) .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.crossJoin.enabled", "true") .config("spark.kryo.registrator", classOf[GanosSparkKryoRegistrator].getName) .enableHiveSupport() .getOrCreate() //SparkSession加载JTS包用于处理时空数据 spark.withJTS import spark.implicits._ val sc = spark.sparkContext
- 加载Hive数据。
val tableName = args(0) //从Hive中读取表 tableName val dfFromHive = sparkSession.sql( s""" |select * from $tableName |""".stripMargin)
- 创建时空Geometry对象。
val ganosDF=dfFromHive.withColumn("geom",st_makePoint(col("x"),col("y"))) ganosDF.show
这里“x”与“y”代表每条记录中空间对象的横纵坐标值。
- 时空查询。
ganosDF.createOrReplaceTempView("testpoints") //创建SQL查询 val points = spark.sql("select * from testpoints where st_contains(st_makeBox2d(st_point(38,48), st_point(52,62)),geom)")