Monday, November 30, 2015

Reducing the memory footprint of the spatial join operator in Hyracks

This is the fourth blog post in a series that describes how to build an efficient spatial join Hyracks operator in AsterixDB. You can refer to the previous posts below:
  1. An Introduction to Hyracks Operators in AsterixDB
  2. Your first Hyracks operator
  3. A Hyracks operator for plane-sweep join

Scope of this post

In the third post, I described how to implement an efficient plane-sweep join algorithm in a Hyracks operator. That implementation simply caches all data frame, or simply records, in memory before running the plane-sweep algorithm. As the input datasets go larger, this algorithm might require a huge memory footprint which is not desirable with the big data that is handled by AsterixDB. In this blog post, I will describe how to improve the previous operator to run with a limited memory capacity.

Main idea

The plane-sweep join algorithm can be considered as the two-dimensional version of the sort-merge join algorithm. After sorting the two lists, you synchronously iterate over them with two pointers and you always advance the pointer that points to the smaller key. This means that you do not really need to look at the two whole datasets for the algorithm to work. You only need to look at a small window of records in both datasets. While the two datasets can be arbitrarily large, this window can remain, in most cases, of a fixed size. The improvement that we're going to describe in this post revolves around this idea of keeping only the records in this small window in memory.

Refactor

As usual, the first step is to refactor the current code to prepare it for the new logic.

CachedFrameWriter

First, we extracted the CachedFrameWriter class to a separate file and improved it to work with a limited memory capacity. We used a CircularQueue which keeps all the queue elements in a fixed size array. We also added a new feature to the CachedFrameWriter that automatically clears all cached frames before the mark point. Thus, whenever the plane-sweep algorithm puts a mark, the CachedFrameWriter translates it as an indication that earlier frames will never be accessed in the plane-sweep join algorithm.
public void mark() {
  // This mark indicates that we do not need to get back beyond this point
  // We can shrink our queue now to accommodate new data frames
  cachedFrames.removeFirstN(this.currentFrame);
  this.markFrame = this.currentFrame = 0;
  this.markRecord = this.currentRecord;
}
We also added a couple of more functions that will be used by the plane-sweep join operator.
public boolean canGrowInMemory() {
  return !cachedFrames.isFull() && !reachedEndOfStream;
}
/**Whether there are more frames to be read from input*/
public boolean isComplete() {
  return reachedEndOfStream;
}

/**Are there more cached frames to iterate over*/
public boolean noMoreImmediatelyAvailableRecords() {
    return this.currentFrame >= this.cachedFrames.size();
}

Incremental PlaneSweepJoin Helper Class

We modified the helper class that actually performs the plane-sweep join algorithm to be stateful so that it can run incrementally. It keeps track of its state and can continue from where it stopped. This allows the plane-sweep operator to run some iterations of the plane-sweep join which will advance the pointers on the two datasets. Based on this advance, it can evict some data frames from memory to accommodate for more frames. Then, it can continue from where it stopped. This also required changing the plane-sweep method to be an instance method, rather than class method, so that it can access the state stored in the instance. The PlaneSweepJoin class now has the following instance variables.
public class PlaneSweepJoin {
  private IHyracksTaskContext ctx;
  private CachedFrameWriter[] datasets;
  private IFrameWriter outputWriter;
  private ITuplePairComparator rx1sx1;
  private ITuplePairComparator rx1sx2;
  private ITuplePairComparator sx1rx2;
  private IPredicateEvaluator predEvaluator;
  private FrameTupleAppender appender;

  /**
   * An enumerated type that stores the current state of the spatial join
   * algorithm so that it can continue from where it stopped.
   */
  public enum SJ_State {
    /** The spatial join algorithm did not start yet */
    SJ_NOT_STARTED,
    /** The current record in dataset 0 is active
      * and needs to be tested with more records in dataset 1 */
    SJ_DATASET0_ACTIVE,
    /** The current record in dataset 1 is active
        and needs to be tested with more records in dataset 0 */
    SJ_DATASET1_ACTIVE,
    /** No active records */
    SJ_NOT_ACTIVE,
    /** The spaital join algorithm has finished. No more work to do. */
    SJ_FINISHED,
  };
  /** Current state of the spatial join algorithm */
  private SJ_State sjState;
Till this point, we didn't do any serious changes to the actual plane-sweep join method. It works as before and it just keeps its state in instance variables instead of local variables.

Remodeled PlaneSweepJoinOperatorDescriptor

Now this is the important part of this blog post where we modify the plane-sweep operator to make it more memory friendly. The main modification is the timing when the plane-sweep function is called. Previously, it was called once after the two inputs are cached completely. Currently, it will be called more often to do part of the join and clean up some memory space. So, we modified the CachedFrameWriter#nextFrame method to call plane-sweep join operator as soon as the in-memory buffer is full.
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
  // Store this buffer in memory for later use
  ByteBuffer copyBuffer = ctx.allocateFrame(buffer.capacity());
  FrameUtils.copyAndFlip(buffer, copyBuffer);
  if (cachedFrames.isFull()) {
    // run the plane-sweep algorithm in case it can free some buffer entries
    try {
      owner.getPlaneSweepJoin().planesweepJoin(this);
      if (owner.getPlaneSweepJoin().getState() == PlaneSweepJoin.SJ_State.SJ_FINISHED) {
        this.clear();
      }
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
   // TODO If after running the plane-sweep, we still cannot find empty entries,
   // we should start spilling records to disk.
   if (cachedFrames.isFull())
     throw new HyracksDataException("Memory full");
   }
  cachedFrames.add(copyBuffer);
}
Notice that there are two instances of CachedFrameWriter, once for each input dataset. Therefore, the PlaneSweepJoin#planesweepJoin method can be called by the two CachedFrameWriters. The method has to be internally modified to synchronize these calls. In the code shown above, a CachedFrameWriter calls the planesweepJoin method when its memory buffer is full. In this case, this method waits until the other CachedFrameWriter also has its memory buffer full, and then it actually carries out the spatial join algorithm, releases some memory buffers, and returns the call back to the sender. This part is shown in the following code snippet.
public synchronized void planesweepJoin(CachedFrameWriter fullDataset)
        throws HyracksDataException, InterruptedException {
  // No more work to do
  if (sjState == SJ_State.SJ_FINISHED)
    return;
  CachedFrameWriter otherDataset =
      fullDataset == datasets[0] ? datasets[1] : datasets[0];
  if (otherDataset.canGrowInMemory()) {
    this.wait();
    return;
  }
  // ... Perform the actual spatial join logic here
  if ((datasets[0].isComplete() &&
       datasets[0].noMoreImmediatelyAvailableRecords()) ||
      (datasets[1].isComplete() &&
       datasets[1].noMoreImmediatelyAvailableRecords()))
    sjState = SJ_State.SJ_FINISHED;
  if (sjState == SJ_State.SJ_FINISHED) {
    appender.flush(outputWriter, true);
    datasets[0].clear();
    datasets[1].clear();
    outputWriter.close();
  }
  // To wake up the other thread
  this.notify();
}
In the code above, when the planesweepJoin method is called, the sender tells which CachedFrameWriter it is. This tells the planesweepJoin method whether it needs to block and wait until the other dataset has a memory buffer full, or it can go ahead with the join logic rightaway. Since there are only two datasets, the planesweepJoin method determines the other dataset and decides to wait until the other dataset has a memory full (i.e., cannot grow in-memory anymore). This condition will match when the first CachedFrameWriter is full but the second one is not. When the second CachedFrameWriter is also full, it will not match this condition and will carry out the actual spatial join code. At the very end, after the whole method is finished, the notify() method is called to wake up the other sleeping thread. That sleeping thread will return immediately as it knows that the method which woke it up has already performed the spatial join operator.
In this execution pattern, the spatial join can be thought as a cooperative task where any of the two threads can perform it, whenever it is possible. This is usually better than spawning a third thread that keeps joining records which are inserted by the two other threads. Also, it fits the architecture of AsterixDB better as the underlying framework is the one responsible of creating threads.

Testing

To test whether this code runs or not, we execute the spatial join operation against two large datasets and on a limited memory that cannot hold the two full datasets. In this case, it has to execute multiple runs of the spatial join algorithm to get the work done. The test is similar to previous one but it has an additional parameter to limit the memory usage.
PlaneSweepJoinOperatorDescriptor join =
  new PlaneSweepJoinOperatorDescriptor(spec,
    new X1X1ComparatorI(), new X1X2ComparatorI(),
    new X1X2ComparatorI(), outputDesc, 10,
    new SpatialOverlapPredicateI());
The sixth parameter (with value 10) tells the PlaneSweepJoinOperatorDescriptor that it has at most 10 memory buffers to cache the input datasets while the dataset needs at least 20 buffers to be completely cached in memory. Of course in a real environment, we would probablly have much more memory buffer.
In the new test case found in the download section, I also added a new case when the two inputs are not sorted. It uses the existing Hyracks sort operator to sort both inputs before feeding them to the spatial join operation. You can find the source code of this new test case in the downloads section below.

Downloads

As usual, all the source code of this blog post can be found here.

1 comment: