SpatialHadoop二级空间索引机制源码分析
Table of Contents
SpatialHadoop已经长期没有更新,MapReduce框架的效率也略低,虽然不太适合直接用,但代码的实现机制可以参考。最近准备重新了解一下HDFS上的空间索引问题,在两年前(没想到距离上次运行SpatialHadoop都两年了..)的基本使用的基础上(一个简单使用的记录),记录一下空间索引机制的处理方式。后面重点关注在Hadoop上的任务提交、并行索引构建、索引的存储与读取这几个方面。
相比GeoSpark的代码,没有Spark现成的算子可以复用并且要处理文件方面的问题,逻辑上的处理稍复杂一点。
空间分析任务的提交
SpatialHadoop提供了一个脚本,用于基本的空间处理,如下面的代码生成测试数据。
sbin/shadoop generate test.rects size:1.gb shape:rect mbr:0,0,1000000,1000000 -overwrite
shadoop脚本做的操作不多,直接通过Hadoop的运行命令运行了edu.umn.cs.spatialHadoop.operations.Main类,在类中的Main函数中处理输入参数。
bin=`dirname "$0"`
bin=`cd "$bin" > /dev/null; pwd`
# Call Hadoop with the operations.Main as the main class
. "$bin"/hadoop edu.umn.cs.spatialHadoop.operations.Main $@
在Main函数中使用了Hadoop的ProgramDriver运行具体的类对象。首先从配置文件 spatial-operations.yaml
中读取支持的类,然后利用反射机制,读取对应类注释的shortName标签,通过shortName决定参数传递的具体的类。
public static void main(String[] args) {
int exitCode = -1;
ProgramDriver pgd = new ProgramDriver();
try {
// 这个位置加载配置文件,配置文件spatial-operations.yaml中包含了支持的完整类名
Yaml yaml = new Yaml();
List<String> ops = yaml.load(SpatialSite.class.getResourceAsStream("/spatial-operations.yaml"));
// 通过反射的机制,提取类对应的源码中的annotation里的shortName,运行时会通过shortname执行对应的类
// 用在上面的生成随机数据中就是通过generate执行edu.umn.cs.spatialHadoop.operations.RandomSpatialGenerator。
for (String op : ops) {
Class<?> opClass = Class.forName(op);
OperationMetadata opMetadata = opClass.getAnnotation(OperationMetadata.class);
pgd.addClass(opMetadata.shortName(), opClass, opMetadata.description());
}
pgd.driver(args); // 这个函数中调用具体的类执行任务
exitCode = 0;
}
catch(Throwable e){
e.printStackTrace();
}
System.exit(exitCode);
}
二级空间索引机制
bin/shadoop index <input> <output> shape:<input format> sindex:<index> blocksize:<size> -overwrite
# 示例
shadoop index test.rects test.grid sindex:grid shape:rect
按照上面提到的机制,会调用Indexer类的Main函数,然后在 index(inputPaths, outputPath, params);
函数的调用提交任务。
public static Job index(Path[] inPaths, Path outPath, OperationsParams params)
throws IOException, InterruptedException, ClassNotFoundException {
// initiallize主要指定local index对象以及构建全局索引
// 索引的方式从params中读取具体的类
// 全局索引的构建可以直接基于原数据或采样后的数据
// 在这里Partitioner和global index是指向的同一个对象
// 对于网格索引,初始化过程只是调用GridPartitioner的构造函数计算了网格大小等基本数据
Partitioner p = initializeIndexers(inPaths, outPath, params); 引
if (OperationsParams.isLocal(new JobConf(params), inPaths)) {
indexLocal(inPaths, outPath, p, params);
return null;
} else {
// 提交MapReduce任务
Job job = indexMapReduce(inPaths, outPath, p, params);
return job;
}
}
// MR任务的主要配置如下
static Job indexMapReduce(Path[] inPaths, Path outPath, Partitioner partitioner,
OperationsParams paramss) throws IOException, InterruptedException,
ClassNotFoundException {
// ...
// Set mapper and reducer
Shape shape = OperationsParams.getShape(conf, "shape");
job.setMapperClass(PartitionerMap.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(shape.getClass());
job.setReducerClass(PartitionerReduce.class);
// Set input and output
job.setInputFormatClass(SpatialInputFormat3.class);
SpatialInputFormat3.setInputPaths(job, inPaths);
job.setOutputFormatClass(IndexOutputFormat.class);
IndexOutputFormat.setOutputPath(job, outPath);
//...
}
Hadoop对文件的输入主要通过InputFormat,常见的方式继承FileInputFormat后用类似TextFileInputFormat的方式处理中间的一些细节,如getSplits函数将输入文件切分成多个InputSplit,createRecord函数为每个InputSplit创建一个RecordReader对象读取具体的数据,然后在Map任务中通过RecordReader将InputSplit解析成key-value的形式。
对于文件输入的处理,通过SpatialInputFormat3划分数据(为啥后面有个3,没有看到1和2..),继承了FileInputFormat。InputSplit的划分比较常规,基于splitSize将文件划分成多个InputSplit,然后会根据数据本地性做一次合并以降低task的数量。在createRecordReader中,根据InputSplit中文件的后缀判断是否为已经有localIndex的文件,如果有则返回LocalIndexRecordReader,否则返回配置文件中设置的对应后缀的RecordReader(默认为SpatialRecordReader3)。
RecordReader用于解析整个InputSplit,使用的key-value形式直接为<Partition, Iterable>的形式,key对应整个InputSplit,value为解析后的数据迭代器,其中V表示空间数据的类型,继承shape。
对于没有索引过的文件,直接按行读取文件后,将每行的text转为具体的Shape。
protected boolean nextShape(V s) throws IOException {
do {
if (!nextLine(tempLine))
return false;
s.fromText(tempLine);
} while (!isMatched(s));
return true;
}
map过程的逻辑比较常规,遍历所有的shape,分别判断每个shape与哪些partition相交,以<partitionID, shape>的key-value形式送到reduce过程处理。
// 去除了些异常处理的语句
@Override
protected void map(Rectangle key, Iterable<? extends Shape> shapes,
final Context context) throws IOException, InterruptedException {
final IntWritable partitionID = new IntWritable();
for (final Shape shape : shapes) {
Rectangle shapeMBR = shape.getMBR();
if (disjoint) {
// 这个位置用了个套娃,更方便传类成员
partitioner.overlapPartitions(shape, new ResultCollector<Integer>() {
@Override
public void collect(Integer r) {
partitionID.set(r);
context.write(partitionID, shape);
}
});
} else {
partitionID.set(partitioner.overlapPartition(shape));
if (partitionID.get() >= 0)
context.write(partitionID, shape);
}
context.progress();
}
}
@Override
public void overlapPartitions(Shape shape, ResultCollector<Integer> matcher) {
if (shape == null)
return;
Rectangle shapeMBR = shape.getMBR();
if (shapeMBR == null)
return;
int col1, col2, row1, row2;
col1 = (int)Math.floor((shapeMBR.x1 - x) / tileWidth);
col2 = (int)Math.ceil((shapeMBR.x2 - x) / tileWidth);
row1 = (int)Math.floor((shapeMBR.y1 - y) / tileHeight);
row2 = (int)Math.ceil((shapeMBR.y2 - y) / tileHeight);
if (col1 < 0) col1 = 0;
if (row1 < 0) row1 = 0;
for (int col = col1; col < col2; col++)
for (int row = row1; row < row2; row++)
matcher.collect(getCellNumber(col, row));
}
reduce过程直接以partition_id和对应的shape创建输出流,将对应的数据按行写入到输出流。对于不带索引的文件,直接写入到最终的结果中;带索引的文件会在getOrCreateDataOutput函数中得到一个临时文件的输出流,在写入结束后又把整个文件读到内存中构建局部索引(这一写一读外加文本解析的开销?),最后将结果文件写入到HDFS。
LocalIndex接口被实现的类只有RRStarLocalIndex,本地索引貌似只支持R*树。
@Override
protected void reduce(IntWritable partitionID, Iterable<Shape> shapes,
Context context) throws IOException, InterruptedException {
LOG.info("Working on partition #"+partitionID);
for (Shape shape : shapes) {
context.write(partitionID, shape);
context.progress();
}
// Indicate end of partition to close the file
// 在OutputFormat中,发现小于0的id号之后表示数据写入完毕可以关闭输出流
context.write(new IntWritable(-partitionID.get()-1), null);
LOG.info("Done with partition #"+partitionID);
}
// IndexOutputFormat中,将数据写到对应的文件
// 看到这里可能会奇怪,LocalIndex去哪了,文件没有被索引。这里有个貌似不太高效的处理,
// 需要构建本地索引的文件首先被写入到临时文件,当写入结束(closePartiton函数中)后
// 创建了新的线程对临时文件构建本地空间索引后上传
@Override
public void write(IntWritable partitionID, S value) throws IOException {
int id = partitionID.get();
if (id < 0) {
// An indicator to close a partition
int partitionToClose = -id - 1;
this.closePartition(partitionToClose);
} else {
// An actual object that we need to write
// 通过ConcurrentHashMap存id对应的OutputStream
OutputStream output = getOrCreateDataOutput(id);
tempText.clear();
value.toText(tempText);
byte[] bytes = tempText.getBytes();
output.write(bytes, 0, tempText.getLength());
output.write(NEW_LINE); // 并没有使用二进制的形式存,而是按行存的数据
Partition partition = partitionsInfo.get(id);
partition.recordCount++;
partition.size += tempText.getLength() + NEW_LINE.length;
partition.expand(value);
if (shape == null)
shape = (S) value.clone();
}
}
private OutputStream getOrCreateDataOutput(int id) throws IOException {
OutputStream out = partitionsOutput.get(id);
if (out == null) {
// First time to write in this partition. Store its information
Partition partition = new Partition();
if (localIndexClass == null) {
// No local index needed. Write to the final file directly
Path path = getPartitionFile(id);
out = outFS.create(path);
partition.filename = path.getName();
} else {
// Write to a temporary file that will later get indexed
File tempFile = File.createTempFile(String.format("part-%05d", id), "lindex");
out = new BufferedOutputStream(new FileOutputStream(tempFile));
tempFiles.put(id, tempFile);
}
partition.cellId = id;
// Set the rectangle to the opposite universe so that we can keep
// expanding it to get the MBR of this partition
partition.set(Double.MAX_VALUE, Double.MAX_VALUE,
-Double.MAX_VALUE, -Double.MAX_VALUE);
// Store in the hashtables for further user
partitionsOutput.put(id, out);
partitionsInfo.put(id, partition);
}
return out;
}
// 省略了一些异常处理
private void closePartition(final int id) {
final Partition partitionInfo = partitionsInfo.get(id);
final OutputStream outStream = partitionsOutput.get(id);
final File tempFile = tempFiles.get(id);
Thread closeThread = new Thread() {
@Override
public void run() {
try {
outStream.close();
if (localIndexClass != null) {
// Build a local index for that file
try {
LocalIndex<S> localIndex = localIndexClass.newInstance();
localIndex.setup(conf);
Path indexedFilePath = getPartitionFile(id);
partitionInfo.filename = indexedFilePath.getName();
// 这个函数的逻辑见后面
// 将tempFile读到内存后解析,通过所有行数据的MBR构建R*树,然后写入HDFS上的索引文件
localIndex.buildLocalIndex(tempFile, indexedFilePath, shape);
// Temporary file no longer needed
tempFile.delete();
}
}
if (disjoint) {
// If data is replicated, we need to shrink down the size of the
// partition to keep partitions disjoint
partitionInfo.set(partitionInfo.getIntersection(partitioner.getPartition(id)));
}
Text partitionText = partitionInfo.toText(new Text());
synchronized (masterFile) {
// Write partition information to the master file
masterFile.write(partitionText.getBytes(), 0, partitionText.getLength());
masterFile.write(NEW_LINE);
}
}
空间索引文件的组织形式
R*树存储到文件的过程。实现中并没有像常规的R树,做一套支持动态添加删除功能的访问,而只是实现了一套类似序列化的存储方式。R树的实现并没有使用树的指针形式,而是用的数组。因此序列化过程重点分成3部分:
- 写入具体的空间数据
- 写入对空间数据索引的r树
- 写入元数据
这个元数据放文件末尾的方式略微有点非主流。可以参考单元测试中的LocalIndexRecordReaderTest,涉及了对空间文件构建索引后存入文件,然后调用LocalIndexRecordReader读取文件。
/**
* <ul>
* <li>
* Data Entries: First, all data entries are written in an order that is consistent
* with the R-tree structure. This order will guarantee that all data entries
* under any node (from the root to leaves) will be adjacent in that order.
* </li>
* <li>
* Tree structure: This part contains the structure of the tree represented
* by its nodes. The nodes are stored in a level order traversal. This guarantees
* that the root will always be the first node and that all siblings will be
* stored consecutively. Each node contains the following information:
* (1) (n) Number of children as a 32-bit integer,
* (2) n Pairs of (child offset, MBR=(x1, y1, x2, y2). The child offset is
* the offset of the beginning of the child data (node or data entry) in the
* tree where 0 is the offset of the first data entry.
* </li>
* <li>
* Tree footer: This section contains some meta data about the tree as
* follows. All integers are 32-bits.
* (1) MBR of the root as (x1, y1, x2, y2),
* (2) Number of data entries,
* (3) Number of non-leaf nodes,
* (4) Number of leaf nodes,
* (5) Tree structure offset: offset of the beginning of the tree structure section
* (6) Footer offset: offset of the beginning of the footer as a 32-bit integer.
* (7) Tree size: Total tree size in bytes including data+structure+footer
* </li>
*
* </ul>
* @param out
* @throws IOException
*/
public void write(DataOutput out, Serializer ser) throws IOException {
// Tree data: write the data entries in the tree order
// Since we write the data first, we will have to traverse the tree twice
// first time to visit and write the data entries in the tree order,
// and second time to visit and write the tree nodes in the tree order.
Deque<Integer> nodesToVisit = new ArrayDeque<Integer>();
nodesToVisit.add(root);
int[] objectOffsets = new int[numOfDataEntries() + numOfNodes()];
// Keep track of the offset of each data object from the beginning of the
// data section
int dataOffset = 0;
// Keep track of the offset of each node from the beginning of the tree
// structure section
// 看起来是一波广度优先遍历,按层处理结点
int nodeOffset = 0;
while (!nodesToVisit.isEmpty()) {
int node = nodesToVisit.removeFirst();
// The node is supposed to be written in this order.
// Measure its offset and accumulate its size
objectOffsets[node] = nodeOffset;
nodeOffset += 4 + (4 + 8 * 4) * Node_size(node);
if (isLeaf.get(node)) {
// Leaf node, write the data entries in order
for (int child : children.get(node)) {
objectOffsets[child] = dataOffset;
if (ser != null)
dataOffset += ser.serialize(out, child);
}
} else {
// Internal node, recursively traverse its children
for (int child : children.get(node))
nodesToVisit.addLast(child);
}
}
// Update node offsets as they are written after the data entries
for (int i = 0; i < numNodes; i++)
objectOffsets[i + numEntries] += dataOffset;
// Tree structure: Write the nodes in tree order
nodesToVisit.add(root);
while (!nodesToVisit.isEmpty()) {
int node = nodesToVisit.removeFirst();
// (1) Number of children
out.writeInt(Node_size(node));
for (int child : children.get(node)) {
// (2) Write the offset of the child
out.writeInt(objectOffsets[child]);
// (3) Write the MBR of each child
out.writeDouble(x1s[child]);
out.writeDouble(y1s[child]);
out.writeDouble(x2s[child]);
out.writeDouble(y2s[child]);
}
// If node is internal, add its children to the nodes to be visited
if (!isLeaf.get(node)) {
for (int child : children.get(node))
nodesToVisit.addLast(child);
}
}
// Tree footer
int footerOffset = dataOffset + nodeOffset;
// (1) MBR of the root
out.writeDouble(x1s[root]);
out.writeDouble(y1s[root]);
out.writeDouble(x2s[root]);
out.writeDouble(y2s[root]);
// (2) Number of data entries
out.writeInt(numOfDataEntries());
// (3) Number of non-leaf nodes
out.writeInt((int) (numOfNodes() - isLeaf.countOnes()));
// (4) Number of leaf nodes
out.writeInt((int) isLeaf.countOnes());
// (5) Offset of the tree structure section
out.writeInt(dataOffset);
// (6) Offset of the footer
out.writeInt(footerOffset);
// (7) Size of the entire tree
int footerSize = 4 * 8 + 6 * 4;
out.writeInt(footerOffset + footerSize);
}
以数组形式组织的R树,在读取时直接按文件顺序读取到数组中即可。对于元数据文件放在了文件尾,LocalIndexRecordReader用了一个这样的操作,seek到文件末尾的位置读取数据:
in.seek(indexEnd - 4);
int indexSize = in.readInt();
long indexStart = indexEnd - indexSize - 4;
// ...
in.seek(indexStart);
localIndex.read(in, indexStart, indexEnd, stockShape); // 读取并构建R树(相当于反序列化)