GeoSpark范围查询源码分析
Table of Contents
GeoSpark
GeoSpark是基于Spark的空间数据处理开源库,在RDD模型的基础上添加了空间数据操作,以底层的SpatialRDD为基础设计了空间分析、空间SQL、空间数据可视化等组件。详细信息可以参考作者博客 https://jiayuasu.github.io/ 以及项目主页 http://sedona.apache.org。GeoSpark一开始是Spark的一个第三方组件,之后改名为sedona提交到apache基金会,当前(2020.11)正处于孵化阶段。
在空间数据的索引与并行访问上,没有像SpatialHadoop那样直接基于HDFS构建针对文件的索引,而是将数据读到RDD中在内存中后进行分区和索引构建操作,索引后的数据可以持久化到硬盘避免下一次的读取,内存的大小一定程度上限制了单次能够处理的数据总量。
最近通过Spark提高SpatialHadoop在设计上的效率,看了一眼GeoSpark在常见的空间处理上的逻辑,针对空间数据读取、索引、划分几个方面的逻辑记个笔记。
主要代码逻辑
GeoSpark的代码大多直接用的java编写,调用了Spark的java API,整体的逻辑比我想象的要简单。代码注释、缩进、命名等貌似都略有非主流的地方。
示例代码主要参考官网教程 http://sedona.apache.org/tutorial/rdd/ 与github仓库源码。
读取csv文件并创建PointRDD
官方示例 Suppose we have a checkin.csv CSV file at Path /Download/checkin.csv as follows:
-88.331492,32.324142,hotel -88.175933,32.360763,gas -88.388954,32.357073,bar -88.221102,32.35078,restaurant
This file has three columns and corresponding offsets(Column IDs) are 0, 1, 2. Use the following code to create a PointRDD
val pointRDDInputLocation = "/Download/checkin.csv" val pointRDDOffset = 0 // The point long/lat starts from Column 0 val pointRDDSplitter = FileDataSplitter.CSV val carryOtherAttributes = true // Carry Column 2 (hotel, gas, bar...) var objectRDD = new PointRDD(sc, pointRDDInputLocation, pointRDDOffset, pointRDDSplitter, carryOtherAttributes)
通过继承SpatialRDD的PointRDD处理点数据,对于csv格式的文件:
- 通过sparkContext.textFile读取text文件;
- mapPartition分行处理,由PointFormatMapper将文本的每一行解析成点的对象。
RangeQuery范围查询
查询PointRDD中一个范围内的点,GeoSpark对索引过的数据与不带索引的数据做了两种实现。
// 不带索引查询
val rangeQueryWindow = new Envelope(-90.01, -80.01, 30.01, 40.01)
val considerBoundaryIntersection = false // Only return gemeotries fully covered by the window
val usingIndex = false
var queryResult = RangeQuery.SpatialRangeQuery(spatialRDD, rangeQueryWindow, considerBoundaryIntersection, usingIndex)
对于没有空间索引的点数据,直接基于filter算子,在RangeFilter类中判断点是否在范围内。
// 构建空间索引并利用索引查询
val rangeQueryWindow = new Envelope(-90.01, -80.01, 30.01, 40.01)
val considerBoundaryIntersection = false // Only return gemeotries fully covered by the window
val buildOnSpatialPartitionedRDD = false // Set to TRUE only if run join query
spatialRDD.buildIndex(IndexType.QUADTREE, buildOnSpatialPartitionedRDD)
val usingIndex = true
var queryResult = RangeQuery.SpatialRangeQuery(spatialRDD, rangeQueryWindow, considerBoundaryIntersection, usingIndex)
对于有空间索引的点数据(如四叉树索引),首先对每个partition构建索引,在mapPartition算子中将所有的点插入到STR-tree或Quad-tree,结果存入indexedRawRDD。范围查询中以partition为单位多indexedRawRDD做mapPartition操作。
SpatialPartitioning空间划分
前面的空间索引构建和分析都是基于Spark在读取数据时根据数据文件的位置直接做的RDD划分,当有需要邻近数据在同一个partition增加邻域查询效率时,可以考虑使用空间划分对数据重新分区。
objectRDD.spatialPartitioning(GridType.KDBTREE)
queryWindowRDD.spatialPartitioning(objectRDD.getPartitioner)
GeoSpark提供了KDB-tree、R-tree、维诺图等多种划分方式,总体上的逻辑差不多。为了降低总体的计算量,GeoSpark并没有直接在原数据上进行空间划分,而是通过采样的方式首先提取一定比例的数据构建空间划分。然后用了三个Spark算子,flatMapToPair将rawSpatialRDD中的每条数据转成(分区id,空间对象)的形式,partitionBy将数据按照分区id重新划分,最后mapPartitions将(分区id,空间对象)提取为空间对象。
具体源码分析
读取csv文件并创建PointRDD
PointRDD是一个java写的类,其构造函数如下。主要逻辑通过sparkContext.textFile读取text文件,然后mapPartition分行处理,由PointFormatMapper将文本的每一行解析成点的对象,从每一行的数据中根据分隔符splitter和标记坐标位置的Offset得到坐标和额外注释。
/**
* Instantiates a new point RDD.
*
* @param sparkContext the spark context
* @param InputLocation the input location
* @param Offset the offset
* @param splitter the splitter,行分隔符,如csv中的','
* @param carryInputData the carry input data,是否存储坐标以外的数据(boolean类型,这个注释略迷)
* @param partitions the partitions,分区数据(命名为numOfPartitions更好?)
* @param newLevel the new level (newStorageLevel)
* @param sourceEpsgCRSCode the source epsg CRS code
* @param targetEpsgCode the target epsg code
*/
public PointRDD(JavaSparkContext sparkContext, String InputLocation, Integer Offset, FileDataSplitter splitter,
boolean carryInputData, Integer partitions, StorageLevel newLevel, String sourceEpsgCRSCode,
String targetEpsgCode) {
JavaRDD rawTextRDD = partitions != null ? sparkContext.textFile(InputLocation, partitions) : sparkContext
.textFile(InputLocation);
if (Offset != null) {
this.setRawSpatialRDD(
// 上面的textFile函数已经得到了一个以line为单位的RDD
// mapPartitions函数对每个partition内部的line进行处理
// 将每行解析成具体的几何对象
// 实现的逻辑在FormatMapper类中,PointFormatMapper只是传了几个参数,感觉像个FormatMapperFactory
// mapPartitions在scala里是传入一个函数,在java里传入一个包含call函数的对象。
rawTextRDD.mapPartitions(new PointFormatMapper(Offset, splitter,
carryInputData)));
} else {
this.setRawSpatialRDD(rawTextRDD.mapPartitions(new PointFormatMapper(splitter, carryInputData)));
}
if (sourceEpsgCRSCode != null && targetEpsgCode != null) {
this.CRSTransform(sourceEpsgCRSCode,
targetEpsgCode);
}
if (newLevel != null) {
this.analyze(newLevel);
}
if (splitter.equals(FileDataSplitter.GEOJSON)) {
this.fieldNames = FormatMapper.readGeoJsonPropertyNames(rawTextRDD.take(1).get(0).toString());
}
}
PointFormatMapper的点文本解析过程。PointFormatMapper的实现略奇怪,继承了FormatMapper,只是改了几个构造函数,感觉只是个初始化的工厂。具体的实现跳到FormatMapper中的call函数。
public class PointFormatMapper
extends FormatMapper
{
public PointFormatMapper(FileDataSplitter Splitter, boolean carryInputData)
{
super(0, 1, Splitter, carryInputData, GeometryType.POINT);
}
//...
}
// FormatMapper类
@Override
public Iterator<T> call(Iterator<String> stringIterator)
throws Exception
{
List<T> result = new ArrayList<>();
while (stringIterator.hasNext()) {
String line = stringIterator.next();
// 解析每一行,得到一个Geometry对象
addGeometry(readGeometry(line), result);
}
return result.iterator();
}
public Geometry readGeometry(String line)
throws ParseException
{
//...
geometry = createGeometry(readCoordinates(line), geometryType);
// ...
}
其中readCoordinates(line)从行中获取一个或多个点的坐标(返回Coordinate[]),createGeometry根据坐标和类型创建具体的Geometry对象。
private Geometry createGeometry(Coordinate[] coordinates, GeometryType geometryType)
{
GeometryFactory geometryFactory = new GeometryFactory();
Geometry geometry = null;
switch (geometryType) {
case POINT:
geometry = geometryFactory.createPoint(coordinates[0]);
break;
case POLYGON:
geometry = geometryFactory.createPolygon(coordinates);
break;
// ...
geometryFactory.createPoint先把Coordinate[]转成CoordinateSequence,然后得到一个Point对象,每个对象需要占用的额外空间略大。Point类继承了Geometry类,除去static对象就已经有包围盒、创建工厂对象、空间参考系id、辅助数据。Point类在此基础上添加了CoordinateSequence类成员存储具体的坐标。按照之前的测试,如果点坐标只包含(x,y)信息,会占用很多的额外存储空间(JVM对象的额外占用以及上面多余的对象变量),仅存一个坐标数组的效率会更高。
public abstract class Geometry
implements Cloneable, Comparable, Serializable
{
/**
* The bounding box of this <code>Geometry</code>.
*/
protected Envelope envelope;
/**
* The {@link GeometryFactory} used to create this Geometry
*/
protected final GeometryFactory factory;
/**
* The ID of the Spatial Reference System used by this <code>Geometry</code>
*/
protected int SRID;
/**
* An object reference which can be used to carry ancillary data defined
* by the client.
*/
private Object userData = "";
/**
* Creates a new <code>Geometry</code> via the specified GeometryFactory.
*
* @param factory
*/
public Geometry(GeometryFactory factory) {
this.factory = factory;
this.SRID = factory.getSRID();
}
RangeQuery范围查询
GeoSpark实现代码
val rangeQueryWindow = new Envelope(-90.01, -80.01, 30.01, 40.01)
val considerBoundaryIntersection = false // Only return gemeotries fully covered by the window
val usingIndex = false
var queryResult = RangeQuery.SpatialRangeQuery(spatialRDD, rangeQueryWindow, considerBoundaryIntersection, usingIndex)
RangeQuery分带索引的查询和不带索引查询两种形式,在SparkRangeQuery中传参。
public static <U extends Geometry, T extends Geometry> JavaRDD<T> SpatialRangeQuery(SpatialRDD<T> spatialRDD, U originalQueryGeometry, boolean considerBoundaryIntersection, boolean useIndex)
throws Exception
{
U queryGeometry = originalQueryGeometry;
if (spatialRDD.getCRStransformation()) { // 坐标系转换
queryGeometry = CRSTransformation.Transform(spatialRDD.getSourceEpsgCode(), spatialRDD.getTargetEpgsgCode(), originalQueryGeometry);
}
if (useIndex == true) {
if (spatialRDD.indexedRawRDD == null) {
throw new Exception("[RangeQuery][SpatialRangeQuery] Index doesn't exist. Please build index on rawSpatialRDD.");
}
return spatialRDD.indexedRawRDD.mapPartitions(new RangeFilterUsingIndex(queryGeometry, considerBoundaryIntersection, true));
}
else {
return spatialRDD.getRawSpatialRDD().filter(new RangeFilter(queryGeometry, considerBoundaryIntersection, true));
}
}
public Boolean call(T geometry)
{
if (leftCoveredByRight) {
return match(geometry, queryGeometry);
}
else {
return match(queryGeometry, queryGeometry);
}
}
public boolean match(Geometry spatialObject, Geometry queryWindow)
{
if (considerBoundaryIntersection) {
if (queryWindow.intersects(spatialObject)) { return true; }
}
else {
if (queryWindow.covers(spatialObject)) { return true; }
}
return false;
}
要使用带空间索引的查询,首先需要构建索引,然后查询时标记使用索引,下面的代码以四叉树索引为例。
val rangeQueryWindow = new Envelope(-90.01, -80.01, 30.01, 40.01)
val considerBoundaryIntersection = false // Only return gemeotries fully covered by the window
val buildOnSpatialPartitionedRDD = false // Set to TRUE only if run join query
spatialRDD.buildIndex(IndexType.QUADTREE, buildOnSpatialPartitionedRDD)
val usingIndex = true
var queryResult = RangeQuery.SpatialRangeQuery(spatialRDD, rangeQueryWindow, considerBoundaryIntersection, usingIndex)
针对有索引的数据,用mapPartition算子处理spatialRDD.indexedRawRDD中的每一条数据,每一条数据具体对应了一个treeIndex。通过treeIndex.query(Envelope searchEnv)函数得到一个List存储的结果,然后依次遍历List中的数据是否符合要求。
// ...
if (useIndex == true) {
if (spatialRDD.indexedRawRDD == null) {
throw new Exception("[RangeQuery][SpatialRangeQuery] Index doesn't exist. Please build index on rawSpatialRDD.");
}
return spatialRDD.indexedRawRDD.mapPartitions(new RangeFilterUsingIndex(queryGeometry, considerBoundaryIntersection, true));
}
else {
return spatialRDD.getRawSpatialRDD().filter(new RangeFilter(queryGeometry, considerBoundaryIntersection, true));
}
// ...
@Override
public Iterator<T> call(Iterator<SpatialIndex> treeIndexes)
throws Exception
{
assert treeIndexes.hasNext() == true;
SpatialIndex treeIndex = treeIndexes.next();
List<T> results = new ArrayList<T>();
List<T> tempResults = treeIndex.query(this.queryGeometry.getEnvelopeInternal());
for (T tempResult : tempResults) {
if (leftCoveredByRight) {
if (match(tempResult, queryGeometry)) {
results.add(tempResult);
}
}
else {
if (match(queryGeometry, tempResult)) {
results.add(tempResult);
}
}
}
return results.iterator();
}
索引的构建通过buildIndex函数,貌似比想象的简单,还是以partition为单位构建索引,通过mapPartition算子对每个partition中的空间数据构建索引,
/**
* Builds the index.
*
* @param indexType the index type
* @param buildIndexOnSpatialPartitionedRDD the build index on spatial partitioned RDD
* @throws Exception the exception
*/
public void buildIndex(final IndexType indexType, boolean buildIndexOnSpatialPartitionedRDD)
throws Exception
{
if (buildIndexOnSpatialPartitionedRDD == false) {
//This index is built on top of unpartitioned SRDD
this.indexedRawRDD = this.rawSpatialRDD.mapPartitions(new IndexBuilder(indexType));
}
else {
if (this.spatialPartitionedRDD == null) {
throw new Exception("[AbstractSpatialRDD][buildIndex] spatialPartitionedRDD is null. Please do spatial partitioning before build index.");
}
this.indexedRDD = this.spatialPartitionedRDD.mapPartitions(new IndexBuilder(indexType));
}
}
在IndexBuilder中,只支持R-tree和四叉树两种索引方式。
@Override
public Iterator<SpatialIndex> call(Iterator<T> objectIterator)
throws Exception
{
SpatialIndex spatialIndex;
if (indexType == IndexType.RTREE) {
spatialIndex = new STRtree();
}
else {
spatialIndex = new Quadtree();
}
while (objectIterator.hasNext()) {
T spatialObject = objectIterator.next();
spatialIndex.insert(spatialObject.getEnvelopeInternal(), spatialObject);
}
Set<SpatialIndex> result = new HashSet();
// 这啥操作
spatialIndex.query(new Envelope(0.0, 0.0, 0.0, 0.0));
result.add(spatialIndex);
return result.iterator();
}
SpatialPartitioning空间划分
objectRDD.spatialPartitioning(GridType.KDBTREE)
queryWindowRDD.spatialPartitioning(objectRDD.getPartitioner)
public boolean spatialPartitioning(GridType gridType)
throws Exception
{
int numPartitions = this.rawSpatialRDD.rdd().partitions().length;
// 基于RDD的分区数量构建空间划分
spatialPartitioning(gridType, numPartitions);
return true;
}
public void spatialPartitioning(GridType gridType, int numPartitions)
throws Exception
{
// 并非直接针对元数据做划分,而是针对采样后的数据
int sampleNumberOfRecords = RDDSampleUtils.getSampleNumbers(numPartitions, this.approximateTotalCount, this.sampleNumber);
// 采样的比例
final double fraction = SamplingUtils.computeFractionForSampleSize(sampleNumberOfRecords, approximateTotalCount, false);
// 这个samples变量存储的是一堆外包围盒,取名叫sampleEnvelopes更好,外包围盒用于后面的Partitioning类的构建
List<Envelope> samples = this.rawSpatialRDD.sample(false, fraction)
// sample函数是RDD自带的算子
.map(new Function<T, Envelope>()
{
@Override
public Envelope call(T geometry)
throws Exception
{
return geometry.getEnvelopeInternal();
}
})
.collect();
// 外包围盒扩宽一点
final Envelope paddedBoundary = new Envelope(
boundaryEnvelope.getMinX(), boundaryEnvelope.getMaxX() + 0.01,
boundaryEnvelope.getMinY(), boundaryEnvelope.getMaxY() + 0.01);
switch (gridType) {
//...
case QUADTREE: {
QuadtreePartitioning quadtreePartitioning = new QuadtreePartitioning(samples, paddedBoundary, numPartitions);
partitionTree = quadtreePartitioning.getPartitionTree();
partitioner = new QuadTreePartitioner(partitionTree);
break;
}
case KDBTREE: {
final KDBTree tree = new KDBTree(samples.size() / numPartitions, numPartitions, paddedBoundary);
for (final Envelope sample : samples) {
tree.insert(sample);
}
tree.assignLeafIds();
partitioner = new KDBTreePartitioner(tree);
break;
}
default:
throw new Exception("[AbstractSpatialRDD][spatialPartitioning] Unsupported spatial partitioning method.");
}
this.spatialPartitionedRDD = partition(partitioner);
}
// 首先flatMapToPair使用partitioner计算每个spatialObject的partition_id,转成(partition_id, spatialObject)的形式
// 然后partitionBy根据partition_id重划分
// 最后mapPartitons将(partition_id, spatialObject)的二元组转成spatialObject
// 感觉几个算子用得略奇怪
private JavaRDD<T> partition(final SpatialPartitioner partitioner)
{
return this.rawSpatialRDD.flatMapToPair(
new PairFlatMapFunction<T, Integer, T>()
{
@Override
public Iterator<Tuple2<Integer, T>> call(T spatialObject)
throws Exception
{
return partitioner.placeObject(spatialObject);
// 返回二元组(spatialObject所在的partition的id, spatialObject)
}
}
).partitionBy(partitioner) // rdd默认的函数,根据key值(上面的partition_id)分配具体的partition
.mapPartitions(new FlatMapFunction<Iterator<Tuple2<Integer, T>>, T>()
{
@Override
public Iterator<T> call(final Iterator<Tuple2<Integer, T>> tuple2Iterator)
throws Exception
{
return new Iterator<T>()
{
@Override
public boolean hasNext()
{
return tuple2Iterator.hasNext();
}
@Override
public T next()
{
return tuple2Iterator.next()._2();
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
};
}
}, true);
}