Thursday, March 31, 2016

Around the world in one hour! (revisit)

In this blog post, we revisit an earlier blog post about extracting data from OpenStreetMap Planet.osm file. We still use the same extraction script in Pigeon but we make it modular and easier to reuse. We make use of the macro definitions in Pig to extract common code into a separate file. In the following part, we first describe the OSMX.pig file which contains the reusable macros. After that, we describe how to use it in your own Pig script.


The osmx.pig file contains all the common code that is used to extract points, ways, or relations from an OSM file. It contains the following functions.


This macro extracts all the nodes from an OSM file. It returns a dataset that contains tuples of the following format.



This macro returns all ways in the file. Each way is returned as a series of line segments which connect two consecutive nodes on the way. It returns a dataset with tuples of the following format.
segment_idlongA generated unique ID for each segment
id1longThe ID of the starting node
latitude1doubleLatitude of the starting node
longitude1doubleLongitude of the starting node
id2longThe ID of the ending node
latitude2doubleLatitude of the ending node
longitude2doubleLongitude of the ending node
way_idlongThe ID of the way that contains this segment
tagsmap[(chararray)]All the tags of the way


This macro returns all ways in the file. However, unlike LoadOSMWaysWithSegments, it returns one tuple for each segment which contains the entire geometry of the way. Each tuple is formatted as follows.

way_idlongThe ID of the way as it appears in the OSM file
first_node_idlongThe ID of the first node in this way
last_node_idlongThe ID of the last node in this way
geombytearrayThe geometry of the way
tagsmap[(chararray)]The tags of the way as they appear in the OSM file


This macro returns all objects in the OSM file. Objects can be one of two cases:
  1. First level relations: This contains relations that contain only ways.
  2. Dangled ways: This contains ways that are not part of any relations.
The returned dataset does not contain second level relations such as relations that contain other relations. The format of the returned dataset is as follows.
object_idlongThe ID of either the relation or the way
geombytearrayThe geometry of the object
tagsmap[(chararray)]The tags of either the way or the relation as they appear in the OSM file


The script planet-extractor.pig provides an example that extracts the datasets that are available on the SpatialHadoop datasets page. The header of this file imports the 'osmx.pig' file as well as the required JAR libraries.

REGISTER spatialhadoop-2.4.jar;
REGISTER pigeon-0.2.1.jar;
REGISTER esri-geometry-api-1.2.jar;
REGISTER jts-1.8.jar;
IMPORT 'osmx.pig';

The next two lines extracts all nodes and writes them to a file.

all_nodes = LoadOSMNodes('$input');

STORE all_nodes INTO '$output/all_nodes.bz2';

This makes it much easier than earlier code where the extraction is interleaved with writing the output.
Similarly, the following few lines extract the road network and writes it to the output.

-- Extract road network
road_network = LoadOSMWaysWithSegments('$input');
road_network = FILTER road_network BY edu.umn.cs.spatialHadoop.osm.HasTag(tags,
road_network = FOREACH road_network GENERATE segment_id,
               id1, latitude1, longitude1,
               id2, latitude2, longitude2,
               way_id, edu.umn.cs.spatialHadoop.osm.MapToJson(tags) AS tags;
STORE road_network INTO '$output/road_network.bz2';

Although the code looks a little bit ugly, it only contains four statements. The first one extracts all the ways as segments using the LoadOSMWaysWithSegments macro. The second statement filters the segments that are related to the road network using the tags attribute. The third statement removes unnecessary columns and the fourth statement writes the output.

Similar to the road network, the next few lines extracts and stores the buildings dataset.

all_objects = LoadOSMObjects('$input');
 buildings = 
FILTER all_objects BY edu.umn.cs.spatialHadoop.osm.HasTag(tags,
STORE buildings INTO '$output/buildings.bz2';

The first statement extracts all the objects from the file. The second statement filters the buildings using the tags attribute. Finally, the third statement stores the output.


This work was partially supported by an AWS in Education Grant.

External Resources

Saturday, February 20, 2016

HadoopViz: Extensible Visualization of Big Spatial Data

With huge sizes of spatial data, a common functionality that users are looking for is to visualize this data to see how it looks like. This gives users the power of quickly exploring new datasets with huge sizes. For example, the video below summarizes 1 trillion points that represent the temperature of every 1 km2 on the earth surface on every day from 2009 to 2014 (total of six years).
This video consists of 72 frames, as one per month. These frames are put together in this video. While one can use a single machine to produce these 72 images, it might take up to 60 hours due to the huge size of the input.
In this blog post, we describe how to use HadoopViz, an extensible visualization framework based on SpatialHadoop, to visualize the same dataset in just three hours using a cluster of 10 machines.
Other than single-level images which are typically of low resolution, HadoopViz can also produce multilevel images where users can interactively zoom in and out to explore huge datasets with a lot of details. For example, the image below is a visualization of a 92GB dataset which represents all the objects extracted from OpenStreetMap dataset. You can pan and zoom in this image to view more details about a specific area.


In a nutshell, HadoopViz uses the parallelization power of MapReduce along with the efficiency of SpatialHadoop to partition the data into smaller parts, visualize each part separately into a smaller image, and then put these partial images together to produce the final image. HadoopViz builds on this idea and provides four key features that make it easy to use and very efficient.
  1. HadoopViz piggybacks data smoothing with visualization allowing it to smooth the data on-the-fly as the image is generated.
  2. HadoopViz automatically decides the best way to partition the data allowing it to scale to generate both small and large images efficiently.
  3. HadoopViz can also visualize multilevel images where users can freely pan and zoom into the image to interactively explore the huge dataset..
  4. Instead of customizing the algorithm for a specific use case, e.g., satellite data, HadoopViz provides an extensible implementation that can support a wide range of visualization types.
Below, we first describe how to generate the visualizations show above using HadoopViz, which ships with the recent version of SpatialHadoop. Then, we describe some technical details about the smoothing, partitioning, and extensibility features.

How to ...

... generate the temperature video

  1. You need to download and setup the most recent version of SpatialHadoop which ships with HadoopViz as its visualization package. Check this page for more details about setting up the most recent version of SpatialHadoop on both Hadoop 1.x and Hadoop 2.x.
  2. Download the temperature dataset you would like to visualize. The temperature dataset we used can be obtained from LP DAAC archive on this link.
    You can use this ruby script to download all the data for the six years if you have a good internet connection and enough storage on your machine. Run it using the following command:
    ruby hdf_downloader.rb time:2009.01.01..2014.12.31
  3. Once you have all the data, you can upload it to your HDFS using 'copyFromLocal' command. Let's assume the data is available at hdfs://user/hadoop/temperature
  4. To visualize the 72 frames, run the following SpatialHadoop command
    shadoop multihdfplot hdfs://user/hadoop/temperature combine:31 dataset:LST_Day_1km hdfs://user/hadoop/frames/ time:2009.01.01..2014.12.31
  5. The frames will be available in the output path hdfs://user/hadoop/frames. Download them using 'copyToLocal' command.
  6. Now, upload the frames to YouTube which will put them together into a video similar to the one shown above.

... generate the multilevel image

  1. Follow step 1 above to download and install SpatialHadoop, if you haven't done already.
  2. Download the 'All objects' dataset at the following link
  3. Upload the file to HDFS using the 'copyFromLocal' command. Let's assume it is uploaded to hdfs://user/hadoop/objects/
    NB: You don't have to decompress the file as SpatialHadoop can decompress it on the fly while visualizing. However, if you upload the compressed file, you need to keep the .bz2 extension to tell SpatialHadoop it is compressed.
  4. To generate a multilevel image with 11 levels similar to the one shown above, type the following command
    shadoop gplot hdfs://user/hadoop/objects -pyramid levels:11 hdfs://user/hadoop/multilevel shape:osm
  5. The generated image will be available at hdfs://user/hadoop/multilevel. Download it to your machine using the 'copyToLocal' command.
  6. To view the image in your browser, open the 'index.html' file available in the output directory.


In visualization, smoothing means the fuse of nearby records according to visualization logic to produce a correct result. For example, satellite datasets typically contain holes which are results of clouds that obstructs the view of the satellites. A smoothing function can recover these holes by estimating the missing values using simple interpolation techniques. The two figures below show an example of how the smoothing function can recover missing points.
Original data without smoothing
Data is smoothed using HadoopViz
HadoopViz support on-the-fly smoothing of the data as the visualization is done. This means that you can easily plug in a different smoothing function and regenerate the image without having to carry out the complex smoothing function as a separate step.


HadoopViz supports two ways of partitioning the data which affect the way it merges intermediate partial images. It can use either the default HDFS partitioning or the spatial partitioning that ships with SpatialHadoop.

Default HDFS Partitioning

By default, when you upload a file to HDFS, it is partitioned into equi-sized chunks of 128MB each. Spatial locations of records are not taken into account and nearby records will typically end up in two different partitions. This means that every partition would possibly cover the entire input space and we will end up overlaying intermediate images to produce the final image as shown below.
Overlay intermediate images

Spatial Partitioning

If we use the spatial partitioning that ships with SpatialHadoop, each partition would only contain data from a small limited space and we will end up stitching intermediate images as shown below.
Stitch intermediate images

Which partitioning technique is better?

While both techniques will end up producing the same final answer, the performance might be different. HadoopViz needs to automatically decide which one to use. First of all, if the data needs to be smoothed, then HadoopViz has to choose spatial partitioning as it is the only one that groups nearby records together in one partition before they can be fused.
If HadoopViz doesn't need to apply a smoothing function, then both techniques are applicable. According to the image size, There's an overhead between the partitioning and merging steps. The default HDFS partitioning is faster than spatial partitioning, but the overlay process is more time consuming than stitch due to the huge sizes of intermediate images. HadoopViz decides to go for spatial partitioning if the image size is huge as the cost of the overlay process becomes more and more time consuming.

Multilevel images

A multilevel image consists of a pyramid of fixed-size tiles, typically, each of size 256x256 pixels. The figure below shows an example of a three-level image with 1, 4, and 16 tiles in its three levels, aka, a pyramid of three levels.
A multilevel of three levels
A naive way to generate a multilevel image is to generate each tile independently using the (single-level) techniques shown above. However, this would require executing the single-level algorithm millions of times. Therefore, HadoopViz provides specialized multilevel visualization algorithms for multi-level images that take into consideration the pyramid structure of multi-level images. Similar to single-level visualization, HadoopViz supports two partitioning techniques, namely, default HDFS partitioning and pyramid partitioning.

Default HDFS Partitioning

If we use default HDFS partitioning, each partition might contain records from all over the input space. In this case, each machine plots all these records to all overlapping tiles in all pyramid levels. The generated tiles are considered partial images as multiple partitions might overlap the same tile. Thus, a final merge step will need to overlay all intermediate partial images for the same tile to produce the final image for that tile.

Pyramid Partitioning

The other option for HadoopViz is to first repartition the data so that all records that overlap with one tile go to one partition. Then, these records are visualized to generate the final image for that tile. No merging is needed here as each tile is only generated by one machine.

Which partitioning technique is better?

Again, there is no clear winner here. It all depends on how many tiles are generated. If only a few tiles are generated, then default HDFS partitioning is better as it only needs to merge a few images. However, if a huge number of tiles are generated, pyramid partitioning is better as it avoids altogether the need for merging intermediate tiles.
HadoopViz splits a huge pyramid into two parts, the top and the base of the pyramid. The top of the pyramid contains only a few tiles and is generated by the default HDFS partitioning technique, while the base contains too many tiles and is generated by the pyramid partitioning technique. The tiles are then put together to produce the final image without any extra processing.


While the above techniques can be customized for every visualization type, it would require a huge coding effort to build and maintain all these implementations. Therefore, HadoopViz proposes a visualization abstraction that is used to describe the visualization logic. This abstraction is then plugged into generic implementations of the above algorithms to produce the image efficiently at scale. In short, if you would like to visualize your own data in a new way, all you need to do is write a small class that extends an abstract class, and you're ready to go with both single-level and multilevel visualization techniques.
A new visualization type is defined by extending the base class Plotter. There are mainly five functions that you would like to implement for a new visualization type.

<S extends Shape> Iterable<S> smooth(Iterable<S> r)

This function takes a set of nearby records, fuses them together, and returns a new set of records. This function can be used to apply a user-specified smoothing logic.

Canvas createCanvas(int width, int height, Rectangle mbr)

This function initializes an empty canvas with the given size in width and height. It also associates this canvas with the given MBR in input space. Notice that Canvas can be virtually anything. We provide a simple abstract Canvas class as a skeleton.

void plot(Canvas layer, Shape shape)

The plot function updates the canvas layer by plotting the given shape on it. Users can define their own visualization logic for one shape based on the format of the shape and the canvas layer.

void merge(Canvas finalLayer, Canvas intermediateLayer)

The merge function merges two intermediate canvases. It updates the finalLayer by merging the intermediateLayer into it.

void writeImage(Canvas layer, DataOutputStream out, boolean vflip)

This writeImage function encodes the canvas layer into a standard image that can be displayed to the end user. The image is written to the given DataOutputStream which typically goes to an output file. If the vflip flag is set to true, the image should be vertically flipped before written to the output. The vflip flag is useful when the y-axis of the input is in a different direction than the final image. For example, in PNG images, the y-axis increases from bottom to top while in geographical coordinates, latitude increases from top to bottom.


This work was partially supported by an AWS in Education Grant.

Further References

  1. SpatialHadoop homepage:
  2. Ahmed Eldawy, Mohamed F. Mokbel and Christopher Jonathan "HadoopViz: A MapReduce Framework for Extensible Visualization of Big Spatial Data". In Proceedings of the 32nd IEEE International Conference on Data Engineering, IEEE ICDE 2016, Helsinki, Finland, May 16-20, 2016
  3. Ahmed Eldawy, Mohamed F. Mokbel and Christopher Jonathan "A Demonstration of HadoopViz: An Extensible MapReduce System for Visualizing Big Spatial Data". In Proceedings of the International Conference on Very Large Databases, VLDB 2015, Kohala Coast, HI, 2015

Wednesday, December 2, 2015

Voronoi diagram and Dealunay triangulation construction of Big Spatial Data using SpatialHadoop

Voronoi Diagram and Delaunay Triangulation

A very popular computational geometry problem is the Voronoi Diagram (VD), and its dual Delaunay Triangulation (DT). In both cases, the input is a set of points (sites). In VD, the output is a tessellation of the space into convex polygons, as one per input site, such that each polygon covers all locations that are closest to the corresponding site than any other site. In DT, the output is a triangulation, where each triangle connects three sites, such that the circumcirlce of each triangle does not contain any other sites. These two constructs are dual in a sense that each edge in the DT connects two sites that share a common edge in VD.

Computation Overhead

These two problems, DT and VD, are known to be computationally intensive. There are several O(n log n) algorithms for both but they are mainly single-machine main-memory algorithms and cannot scale to the current sizes of Big Spatial Data. For example, earth surface modeling is usually done via Delaunay triangulation of elevation data. With the increasing resolution of data collected through LiDAR, we might need to construct a Delaunay triangulation for a few trillions of points which cannot be loaded into one machine.

SpatialHadoop and CG_Hadoop

SpatialHadoop provides a computational geometry library, named CG_Hadoop, which includes MapReduce algorithms for both VD and DT. These algorithms can scale to trillions of points by distributing the computation over a Hadoop cluster with up-to thousands of machines. SpatialHadoop is able to construct a DT of 2.7 billion points extracted from OpenStreetMap in less than an hour using only 20 machines.

Single-machine Divide and Conquer (D&C) Algorithm

SpatialHadoop MapReduce algorithm is based on the  main-memory divide and conquer algorithm. This traditional algorithm, originally proposed by Guibas and Stolfi, divides the input sites into two subsets using a straight line, usually a vertical line, computes the DT recursively for each subset, and then merges the two partial DTs. This algorithm requires all data to fit in the main memory which cannot be done with big spatial datasets.

CG_Hadoop MapReduce Algorithm

A straight forward implementation of the D&C algorithm in MapReduce wouldn't work because of the overhead of the merge step. The final merge step has to be done on a single machine and it would require loading the whole final DT in its main memory, which takes us back to the limitation of the single-machine algorithm. Alternatively, CG_Hadoop proposes a novel approach which employs the following two techniques:
  1. Instead of partitioning the space into two, CG_Hadoop partitions the space using one of the SpatialHadoop partitioning methods, e.g., Grid or R-tree, so that it achieves a higher level of parallelism by assigning each partition to one of the machines.
  2. CG_Hadoop early detects VD portions that are final, i.e., will never be affected or used by subsequent merge process. In other words, CG_Hadoop detects the Voronoi regions, or Delaunay triangles, that might be affected by the merge step, and only sends these portions to the merge step.
These two techniques allow the construction process of both VD and DT to be much more scalable as each machine computes a partial VD one partition at a time. Then, each machine early detects parts of the partial VD that will not be affected by the subsequent merge steps, and early writes them to the output. Furthermore, the merge step becomes much faster and scalable as it deals with only a few non-final parts of the VD rather than all of it.
The figure below illustrates the main idea of the pruning technique. The input points are partitioned using a uniform grid into four partitions. The VD of each partition is computed on a separate machine in the map function. Each machine detects final Voronoi regions, marked in green, and writes them to the final output. Only non-final regions, marked in red, are sent to the next merge step.

Local VD step. Each mapper computes a partial VD for one partition and writes final Voronoi Regions (green) to the output. Non-final regions (red) are sent to the next step for merging

How to Use it

The above algorithm ships with the recent version of SpatialHadoop available on github. First, you need to spatially partition the input set of points using the 'index' command. For example
shadoop index input.points shape:point input.grid sindex:grid
Then, you will call the DT operation as follows:
shadoop dt input.grid output.dt shape:point
It launches a single MapReduce job that generates the DT of the input set of points.

More Details

The VD algorithm runs in four steps, namely, partitioning, local VD, vertical merge, and horizontal merge.

1. Partitioning

The partitioning step uses any space partitioning technique supported by SpatialHadoop, such as a uniform grid index.

2. Local VD

In the local VD step, each partition is assigned to one machine which computes the VD for all points in that partition. We use the traditional Guibas and Stolfi D&C algorithm as the size of each partition is small enough to fit in the main memory. We can also use a more advanced algorithm, such as Dwyer's algorithm. We leave this as an exercise for interested readers. Upon the computation of the local VD, each machine detects final Voronoi regions, removes them from the VD, and writes them to the final output. To detect final regions, we use a very simple, yet efficient, rule that is derived from the VD definition. Simply, a Voronoi region is final if it is not possible to have any other point closer to its generator site. The following figure shows two examples of final and non-final regions.
A final Voronoi region. The nearest possible site outside the partition is still farther away than the generator site of that region
A non-final Voronoi region. There could be a site (S2) in another partition that is closer to some (shaded) areas in that Voronoi region.
To find all non-final regions quickly, without exhaustively applying this rule to all regions, we run a breadth-first search (BFS) from the outer regions which expands only to non-final regions. The trick is that all non-final regions form a contiguous region towards the exterior of the VD. In this way, a VD of 1.4 million regions can be pruned by applying the rule to only 7K regions.

3. Vertical Merge

In this step, partial VDs which are on the same column of the partition grid are merged together into vertical strips. This step can be done in parallel as each machine is assigned all partial VDs in one vertical strip. Notice that each machine receives only non-final regions from the assigned VDs are processed by that machine which reduces the memory and computation overhead. After merging them, the pruning rule is applied again to further remove final regions that will not be processed later, as shown in the figure below.
The vertical merge step. Each vertical strip is processed by a reducer which merges all partial VDs in that strip. Final regions in the strip (blue) are directly written to the output while non-final regions (red) are sent to the next step for the final merge step.

4. Horizontal Merge

In this final step, all partial VDs computed for vertical strips are sent to one machine which merges them to compute the final answer. Again, only non-final regions are processed at this step which makes it possible to run on a single machine. As shown in the figure below, all resulting regions are written to disk as there is no further merge steps.
The final horizontal merge step. All resulting Voronoi regions are written to the final output as there are no further merge step.

The Voronoi regions written by the three steps comprise the final VD, as shown in the figure below.
The final results. Green, blue and black regions are the ones written by the local VD, vertical merge, and horizontal merge steps, respectively.

Performance Evaluation

We run the above algorithm on Amazon EC2 using a real dataset extracted from OpenStreetMap. We compare to the standard D&C algorithm running on a machine with 1TB of memory. We have access to such a powerful machine through Minnesota Supercomputing Institute (MSI). CG_Hadoop is 30 times faster than a single machine with 1 billion points. Furthermore, the single machine fails to process the 2.7 billion dataset while CG_Hadoop finishes the computation in around 40 minutes.


The work in Voronoi diagrams was done with Nadezda Weber while she was studying in University of Minnesota. She is currently a graduate student in Oxford University.
The experiments were run on an Amazon EC2 cluster and were supported by AWS in Education Grant award.

External Links

Monday, November 30, 2015

Reducing the memory footprint of the spatial join operator in Hyracks

This is the fourth blog post in a series that describes how to build an efficient spatial join Hyracks operator in AsterixDB. You can refer to the previous posts below:
  1. An Introduction to Hyracks Operators in AsterixDB
  2. Your first Hyracks operator
  3. A Hyracks operator for plane-sweep join

Scope of this post

In the third post, I described how to implement an efficient plane-sweep join algorithm in a Hyracks operator. That implementation simply caches all data frame, or simply records, in memory before running the plane-sweep algorithm. As the input datasets go larger, this algorithm might require a huge memory footprint which is not desirable with the big data that is handled by AsterixDB. In this blog post, I will describe how to improve the previous operator to run with a limited memory capacity.

Main idea

The plane-sweep join algorithm can be considered as the two-dimensional version of the sort-merge join algorithm. After sorting the two lists, you synchronously iterate over them with two pointers and you always advance the pointer that points to the smaller key. This means that you do not really need to look at the two whole datasets for the algorithm to work. You only need to look at a small window of records in both datasets. While the two datasets can be arbitrarily large, this window can remain, in most cases, of a fixed size. The improvement that we're going to describe in this post revolves around this idea of keeping only the records in this small window in memory.


As usual, the first step is to refactor the current code to prepare it for the new logic.


First, we extracted the CachedFrameWriter class to a separate file and improved it to work with a limited memory capacity. We used a CircularQueue which keeps all the queue elements in a fixed size array. We also added a new feature to the CachedFrameWriter that automatically clears all cached frames before the mark point. Thus, whenever the plane-sweep algorithm puts a mark, the CachedFrameWriter translates it as an indication that earlier frames will never be accessed in the plane-sweep join algorithm.
public void mark() {
  // This mark indicates that we do not need to get back beyond this point
  // We can shrink our queue now to accommodate new data frames
  this.markFrame = this.currentFrame = 0;
  this.markRecord = this.currentRecord;
We also added a couple of more functions that will be used by the plane-sweep join operator.
public boolean canGrowInMemory() {
  return !cachedFrames.isFull() && !reachedEndOfStream;
/**Whether there are more frames to be read from input*/
public boolean isComplete() {
  return reachedEndOfStream;

/**Are there more cached frames to iterate over*/
public boolean noMoreImmediatelyAvailableRecords() {
    return this.currentFrame >= this.cachedFrames.size();

Incremental PlaneSweepJoin Helper Class

We modified the helper class that actually performs the plane-sweep join algorithm to be stateful so that it can run incrementally. It keeps track of its state and can continue from where it stopped. This allows the plane-sweep operator to run some iterations of the plane-sweep join which will advance the pointers on the two datasets. Based on this advance, it can evict some data frames from memory to accommodate for more frames. Then, it can continue from where it stopped. This also required changing the plane-sweep method to be an instance method, rather than class method, so that it can access the state stored in the instance. The PlaneSweepJoin class now has the following instance variables.
public class PlaneSweepJoin {
  private IHyracksTaskContext ctx;
  private CachedFrameWriter[] datasets;
  private IFrameWriter outputWriter;
  private ITuplePairComparator rx1sx1;
  private ITuplePairComparator rx1sx2;
  private ITuplePairComparator sx1rx2;
  private IPredicateEvaluator predEvaluator;
  private FrameTupleAppender appender;

   * An enumerated type that stores the current state of the spatial join
   * algorithm so that it can continue from where it stopped.
  public enum SJ_State {
    /** The spatial join algorithm did not start yet */
    /** The current record in dataset 0 is active
      * and needs to be tested with more records in dataset 1 */
    /** The current record in dataset 1 is active
        and needs to be tested with more records in dataset 0 */
    /** No active records */
    /** The spaital join algorithm has finished. No more work to do. */
  /** Current state of the spatial join algorithm */
  private SJ_State sjState;
Till this point, we didn't do any serious changes to the actual plane-sweep join method. It works as before and it just keeps its state in instance variables instead of local variables.

Remodeled PlaneSweepJoinOperatorDescriptor

Now this is the important part of this blog post where we modify the plane-sweep operator to make it more memory friendly. The main modification is the timing when the plane-sweep function is called. Previously, it was called once after the two inputs are cached completely. Currently, it will be called more often to do part of the join and clean up some memory space. So, we modified the CachedFrameWriter#nextFrame method to call plane-sweep join operator as soon as the in-memory buffer is full.
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
  // Store this buffer in memory for later use
  ByteBuffer copyBuffer = ctx.allocateFrame(buffer.capacity());
  FrameUtils.copyAndFlip(buffer, copyBuffer);
  if (cachedFrames.isFull()) {
    // run the plane-sweep algorithm in case it can free some buffer entries
    try {
      if (owner.getPlaneSweepJoin().getState() == PlaneSweepJoin.SJ_State.SJ_FINISHED) {
    } catch (InterruptedException e) {
   // TODO If after running the plane-sweep, we still cannot find empty entries,
   // we should start spilling records to disk.
   if (cachedFrames.isFull())
     throw new HyracksDataException("Memory full");
Notice that there are two instances of CachedFrameWriter, once for each input dataset. Therefore, the PlaneSweepJoin#planesweepJoin method can be called by the two CachedFrameWriters. The method has to be internally modified to synchronize these calls. In the code shown above, a CachedFrameWriter calls the planesweepJoin method when its memory buffer is full. In this case, this method waits until the other CachedFrameWriter also has its memory buffer full, and then it actually carries out the spatial join algorithm, releases some memory buffers, and returns the call back to the sender. This part is shown in the following code snippet.
public synchronized void planesweepJoin(CachedFrameWriter fullDataset)
        throws HyracksDataException, InterruptedException {
  // No more work to do
  if (sjState == SJ_State.SJ_FINISHED)
  CachedFrameWriter otherDataset =
      fullDataset == datasets[0] ? datasets[1] : datasets[0];
  if (otherDataset.canGrowInMemory()) {
  // ... Perform the actual spatial join logic here
  if ((datasets[0].isComplete() &&
       datasets[0].noMoreImmediatelyAvailableRecords()) ||
      (datasets[1].isComplete() &&
    sjState = SJ_State.SJ_FINISHED;
  if (sjState == SJ_State.SJ_FINISHED) {
    appender.flush(outputWriter, true);
  // To wake up the other thread
In the code above, when the planesweepJoin method is called, the sender tells which CachedFrameWriter it is. This tells the planesweepJoin method whether it needs to block and wait until the other dataset has a memory buffer full, or it can go ahead with the join logic rightaway. Since there are only two datasets, the planesweepJoin method determines the other dataset and decides to wait until the other dataset has a memory full (i.e., cannot grow in-memory anymore). This condition will match when the first CachedFrameWriter is full but the second one is not. When the second CachedFrameWriter is also full, it will not match this condition and will carry out the actual spatial join code. At the very end, after the whole method is finished, the notify() method is called to wake up the other sleeping thread. That sleeping thread will return immediately as it knows that the method which woke it up has already performed the spatial join operator.
In this execution pattern, the spatial join can be thought as a cooperative task where any of the two threads can perform it, whenever it is possible. This is usually better than spawning a third thread that keeps joining records which are inserted by the two other threads. Also, it fits the architecture of AsterixDB better as the underlying framework is the one responsible of creating threads.


To test whether this code runs or not, we execute the spatial join operation against two large datasets and on a limited memory that cannot hold the two full datasets. In this case, it has to execute multiple runs of the spatial join algorithm to get the work done. The test is similar to previous one but it has an additional parameter to limit the memory usage.
PlaneSweepJoinOperatorDescriptor join =
  new PlaneSweepJoinOperatorDescriptor(spec,
    new X1X1ComparatorI(), new X1X2ComparatorI(),
    new X1X2ComparatorI(), outputDesc, 10,
    new SpatialOverlapPredicateI());
The sixth parameter (with value 10) tells the PlaneSweepJoinOperatorDescriptor that it has at most 10 memory buffers to cache the input datasets while the dataset needs at least 20 buffers to be completely cached in memory. Of course in a real environment, we would probablly have much more memory buffer.
In the new test case found in the download section, I also added a new case when the two inputs are not sorted. It uses the existing Hyracks sort operator to sort both inputs before feeding them to the spatial join operation. You can find the source code of this new test case in the downloads section below.


As usual, all the source code of this blog post can be found here.

Tuesday, November 24, 2015

A Hyracks operator for plane-sweep join

This is the third blog post in a series of blog posts about creating an efficient Hyracks operator for spatial join. In the previous two posts, we gave an introduction to Hyracks operators and briefly described how to write a simple Hyracks operator. In this blog post, we describe how to make the previously created operator more efficient by using a plane-sweep spatial join algorithm instead of a naive nested loop algorithm.

Scope of this blog post

In this blog post, we will focus on improving the operator we created in the last blog post by replacing the nested-loop join subroutine with the more efficient plane-sweep join algorithm. In addition, we will do some minor code refactor to keep the code organized. For simplicity, we assume that the two inputs are already sorted on the x coordinate which can be done using one of the sort operators that ship with AsterixDB, e.g., ExternalSortOperatorDescriptor.

Code Refactor

Let's first quickly describe the code refactor we did to make the code more organized.

Self-contained CacheFrameWriter

 I made the CacheFrameWriter self-contained by moving the RecordDescriptor and the FrameTupleAccessor inside it. This will make it easier to iterator over the records inside it and avoids repeating the iteration code for the two input datasets.
public class CacheFrameWriter implements IFrameWriter {
  /** {@link FrameTupleAccessor} to iterate over records */
  protected FrameTupleAccessor fta;
  /** {@link RecordDescriptor} for cached data */
  private RecordDescriptor rd;
I also added the necessary attributes and methods to iterate over cached records.
/** The current frame being accessed */
private int currentFrame;
/** The index of the record inside the current frame being accessed */
protected int currentRecord;

public void init() {
  this.currentFrame = 0;
  this.currentRecord = 0;
  this.fta = new FrameTupleAccessor(rd);

/** Returns true if end-of-list has been reached */
public boolean isEOL() {
  return this.currentFrame >= this.cachedFrames.size();

public void next() {
  // Skip to next frame if reached end of current frame
  while (currentRecord >= fta.getTupleCount() && currentFrame < cachedFrames.size()) {
    if (currentFrame < cachedFrames.size()) {
      // Move to next data frame
      currentRecord = 0;
Finally, I added the mark/reset functionality to the CacheFrameWriter so that I can mark the current position and reset to it later.
/** The index of the marked frame */
private int markFrame;
/** The index of the marked record inside the marked frame */
private int markRecord;

/** Put a mark on the current record being accessed */
public void mark() {
  this.markFrame = this.currentFrame;
  this.markRecord = this.currentRecord;

/** Reset the iterator to the last marked position  */
public void reset() {
  this.currentFrame = this.markFrame;
  this.currentRecord = this.markRecord;

Make CacheFrameWriter a subclass

I also moved the CacheFrameWriter inside the PlaneSweepActivityNode class. This allowed me to remove the unnecessary Notifiable interface and to cascade the failure of the input to the output in the CacheFrameWriter#fail method.
public void fail() throws HyracksDataException {; // Cascade the failure to the output
  cachedFrames = null; // To prevent further insertions

Extract the spatial join algorithm

We also extracted the spatial join algorithm, currently nested-loop, to a helper class called PlaneSweepJoin.
public class PlaneSweepJoin {
  public static void planesweepJoin(IHyracksTaskContext ctx,
      CacheFrameWriter[] datasets, IFrameWriter outputWriter,
      IPredicateEvaluator predEvaluator) throws HyracksDataException {
    // Spatial join code goes here
I made it a static class to make it totally isolated and to avoid the creation of unnecessary objects. In the PlaneSweepJoinOperatorDescriptor class, this method is called in the PlaneSweepActivityNode#notify method, which is also renamed to #inputComplete.
public void inputComplete() throws HyracksDataException {
  // A notification that one of the inputs has been completely read
  if (numInputsComplete == 2) {
    PlaneSweepJoin.planesweepJoin(ctx, datasets, outputWriter, predEvaluator);

Create the necessary comparators

In order to run the plane-sweep join algorithm, we need to compare records in the two datasets based on their x coordinates. Since all records are kept in their binary format, we need to create helper functions that compare these records without having to parse them. Since, in our case, the two input datasets have the same format, we need to create two comparators, one that compares the left edge (x1) of two records, and one that compares the left edge (x1) of one record to the right edge (x2) of the other record. The implementation of one of the comparators is shown below while the other can be implemented similarly.
public static class X1X1ComparatorD implements ITuplePairComparator, Serializable {
  public int compare(IFrameTupleAccessor fta0, int tupId0,
                     IFrameTupleAccessor fta1, int tupId1)
                     throws HyracksDataException {
    // Read the coordinates of the first rectangle
    ByteBuffer buf = fta0.getBuffer();
    double r0_x1 = buf.getDouble(fta0.getAbsoluteFieldStartOffset(tupId0, 1));
    // Read the coordinates of the second rectangle
    buf = fta1.getBuffer();
    double r1_x1 = buf.getDouble(fta1.getAbsoluteFieldStartOffset(tupId1, 1));

    if (r0_x1 < r1_x1) return -1;
    if (r0_x1 > r1_x1) return 1;
    return 0;
These comparators are sent to the initializer of the PlaneSweepJoinOperatorDescriptor which in turn passes them to the PlaneSweepJoinActivityNode.
public PlaneSweepJoinOperatorDescriptor(IOperatorDescriptorRegistry spec,
        ITuplePairComparator rx1sx1, ITuplePairComparator rx1sx2,
        ITuplePairComparator sx1rx2, RecordDescriptor outputDescriptor,
        IPredicateEvaluator predEvaluator) {
  super(spec, 2, 1);
  this.rx1sx1 = rx1sx1;
  this.rx1sx2 = rx1sx2;
  this.sx1rx2 = sx1rx2;
  // The rest of the constructor goes here ...
public void inputComplete() throws HyracksDataException {
  // A notification that one of the inputs has been completely read
  if (numInputsComplete == 2) {
    PlaneSweepJoin.planesweepJoin(ctx, datasets, outputWriter,
       rx1sx1, rx1sx2, sx1rx2, predEvaluator);

Notice that the join operator needs three comparators because the format of the two datasets might be different which means the comparator that compares x1 of the left dataset to x2 of the right dataset could be different than the operator that compares x1 of the right dataset to x2 of the left dataset.

The Plane-sweep Join function

Finally, the plane-sweep join function is used in replacement of the nested-loop join function. The function is shown below.
public static void planesweepJoin(IHyracksTaskContext ctx,
    CacheFrameWriter[] datasets, IFrameWriter outputWriter,
    ITuplePairComparator rx1sx1, ITuplePairComparator rx1sx2,
    ITuplePairComparator sx1rx2, IPredicateEvaluator predEvaluator)
    throws HyracksDataException {
  FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));;
  // The two inputs have been completely read. Do the join
  while (!datasets[0].isEOL() && !datasets[1].isEOL()) {
    // Move the sweep line to the lower of the two current records
    if ([0].fta, datasets[0].currentRecord,
           datasets[1].fta, datasets[1].currentRecord) < 0) {
      // Sweep line stops at an r record (dataset 0)
      // Current r record is called the active record
      // Scan records in dataset 1 until we pass the right
      // edge of the active record. i.e., while (s.x1 < r.x2)

      // Mark the current position at dataset 1 to return to it later

      while (!datasets[1].isEOL() &&
              datasets[1].fta, datasets[1].currentRecord,
              datasets[0].fta, datasets[0].currentRecord) < 0) {
        // Check if r and s overlap
        if (predEvaluator.evaluate(
              datasets[0].fta, datasets[0].currentRecord,
              datasets[1].fta, datasets[1].currentRecord)) {
          // Report this pair to answer
          FrameUtils.appendConcatToWriter(outputWriter, appender,
                  datasets[0].fta, datasets[0].currentRecord,
                  datasets[1].fta, datasets[1].currentRecord);
        // Move to next record in s

      // Reset to the old position of dataset 1
      // Move to the next record in dataset 0
    } else {
      // Sweep line stops at an s record (dataset 1)
      // Current s record is called the active record
      // Scan records in dataset 0 until we pass the right
      // edge of the active record. i.e., while (r.x1 < s.x2)

      // Mark the current position at dataset 0 to return to it later

      while (!datasets[0].isEOL() &&
               datasets[0].fta, datasets[0].currentRecord,
               datasets[1].fta, datasets[1].currentRecord) < 0) {
        // Check if r and s overlap
        if (predEvaluator.evaluate(
               datasets[0].fta, datasets[0].currentRecord,
               datasets[1].fta, datasets[1].currentRecord)) {
          // Report this pair to answer
          FrameUtils.appendConcatToWriter(outputWriter, appender,
                     datasets[0].fta, datasets[0].currentRecord,
                     datasets[1].fta, datasets[1].currentRecord);
        // Move to next record in r

      // Reset to the old position of dataset 1
      // Move to the next record in dataset 0
  // Finalize output writing
  appender.flush(outputWriter, true);

Benefits of using the comparator interface

Using the comparator interface might not be as straight forward as using the actual records, it has two main benefits. First, it is more efficient as it does not have to instantiate and parse the whole records while all the comparisons are only done on two attributes, x1, and x2. Second, it makes the join algorithm oblivious to the underlying data format which means the one can easily use records of a different format without having to modify the spatial join code. As an example, I added a second test case, found in the download section, which runs the same spatial join technique against rectangles represented with integer coordinates rather than double.


You can download the full source code for this blog post here.