Tuesday, November 24, 2015

A Hyracks operator for plane-sweep join

This is the third blog post in a series of blog posts about creating an efficient Hyracks operator for spatial join. In the previous two posts, we gave an introduction to Hyracks operators and briefly described how to write a simple Hyracks operator. In this blog post, we describe how to make the previously created operator more efficient by using a plane-sweep spatial join algorithm instead of a naive nested loop algorithm.

Scope of this blog post

In this blog post, we will focus on improving the operator we created in the last blog post by replacing the nested-loop join subroutine with the more efficient plane-sweep join algorithm. In addition, we will do some minor code refactor to keep the code organized. For simplicity, we assume that the two inputs are already sorted on the x coordinate which can be done using one of the sort operators that ship with AsterixDB, e.g., ExternalSortOperatorDescriptor.

Code Refactor

Let's first quickly describe the code refactor we did to make the code more organized.

Self-contained CacheFrameWriter

 I made the CacheFrameWriter self-contained by moving the RecordDescriptor and the FrameTupleAccessor inside it. This will make it easier to iterator over the records inside it and avoids repeating the iteration code for the two input datasets.
public class CacheFrameWriter implements IFrameWriter {
  /** {@link FrameTupleAccessor} to iterate over records */
  protected FrameTupleAccessor fta;
  /** {@link RecordDescriptor} for cached data */
  private RecordDescriptor rd;
}
I also added the necessary attributes and methods to iterate over cached records.
/** The current frame being accessed */
private int currentFrame;
/** The index of the record inside the current frame being accessed */
protected int currentRecord;

public void init() {
  this.currentFrame = 0;
  this.currentRecord = 0;
  this.fta = new FrameTupleAccessor(rd);
  this.fta.reset(this.cachedFrames.get(currentFrame));
}

/** Returns true if end-of-list has been reached */
public boolean isEOL() {
  return this.currentFrame >= this.cachedFrames.size();
}

public void next() {
  this.currentRecord++;
  // Skip to next frame if reached end of current frame
  while (currentRecord >= fta.getTupleCount() && currentFrame < cachedFrames.size()) {
    currentFrame++;
    if (currentFrame < cachedFrames.size()) {
      // Move to next data frame
      this.fta.reset(this.cachedFrames.get(currentFrame));
      currentRecord = 0;
    }
  }
}
Finally, I added the mark/reset functionality to the CacheFrameWriter so that I can mark the current position and reset to it later.
/** The index of the marked frame */
private int markFrame;
/** The index of the marked record inside the marked frame */
private int markRecord;

/** Put a mark on the current record being accessed */
public void mark() {
  this.markFrame = this.currentFrame;
  this.markRecord = this.currentRecord;
}

/** Reset the iterator to the last marked position  */
public void reset() {
  this.currentFrame = this.markFrame;
  this.currentRecord = this.markRecord;
}

Make CacheFrameWriter a subclass

I also moved the CacheFrameWriter inside the PlaneSweepActivityNode class. This allowed me to remove the unnecessary Notifiable interface and to cascade the failure of the input to the output in the CacheFrameWriter#fail method.
@Override
public void fail() throws HyracksDataException {
  outputWriter.fail(); // Cascade the failure to the output
  cachedFrames = null; // To prevent further insertions
}

Extract the spatial join algorithm

We also extracted the spatial join algorithm, currently nested-loop, to a helper class called PlaneSweepJoin.
public class PlaneSweepJoin {
  public static void planesweepJoin(IHyracksTaskContext ctx,
      CacheFrameWriter[] datasets, IFrameWriter outputWriter,
      IPredicateEvaluator predEvaluator) throws HyracksDataException {
    // Spatial join code goes here
  }
}
I made it a static class to make it totally isolated and to avoid the creation of unnecessary objects. In the PlaneSweepJoinOperatorDescriptor class, this method is called in the PlaneSweepActivityNode#notify method, which is also renamed to #inputComplete.
public void inputComplete() throws HyracksDataException {
  // A notification that one of the inputs has been completely read
  numInputsComplete++;
  if (numInputsComplete == 2) {
    PlaneSweepJoin.planesweepJoin(ctx, datasets, outputWriter, predEvaluator);
  }
}

Create the necessary comparators

In order to run the plane-sweep join algorithm, we need to compare records in the two datasets based on their x coordinates. Since all records are kept in their binary format, we need to create helper functions that compare these records without having to parse them. Since, in our case, the two input datasets have the same format, we need to create two comparators, one that compares the left edge (x1) of two records, and one that compares the left edge (x1) of one record to the right edge (x2) of the other record. The implementation of one of the comparators is shown below while the other can be implemented similarly.
public static class X1X1ComparatorD implements ITuplePairComparator, Serializable {
  @Override
  public int compare(IFrameTupleAccessor fta0, int tupId0,
                     IFrameTupleAccessor fta1, int tupId1)
                     throws HyracksDataException {
    // Read the coordinates of the first rectangle
    ByteBuffer buf = fta0.getBuffer();
    double r0_x1 = buf.getDouble(fta0.getAbsoluteFieldStartOffset(tupId0, 1));
    // Read the coordinates of the second rectangle
    buf = fta1.getBuffer();
    double r1_x1 = buf.getDouble(fta1.getAbsoluteFieldStartOffset(tupId1, 1));

    if (r0_x1 < r1_x1) return -1;
    if (r0_x1 > r1_x1) return 1;
    return 0;
  }
}
These comparators are sent to the initializer of the PlaneSweepJoinOperatorDescriptor which in turn passes them to the PlaneSweepJoinActivityNode.
public PlaneSweepJoinOperatorDescriptor(IOperatorDescriptorRegistry spec,
        ITuplePairComparator rx1sx1, ITuplePairComparator rx1sx2,
        ITuplePairComparator sx1rx2, RecordDescriptor outputDescriptor,
        IPredicateEvaluator predEvaluator) {
  super(spec, 2, 1);
  this.rx1sx1 = rx1sx1;
  this.rx1sx2 = rx1sx2;
  this.sx1rx2 = sx1rx2;
  // The rest of the constructor goes here ...
}
and
public void inputComplete() throws HyracksDataException {
  // A notification that one of the inputs has been completely read
  numInputsComplete++;
  if (numInputsComplete == 2) {
    PlaneSweepJoin.planesweepJoin(ctx, datasets, outputWriter,
       rx1sx1, rx1sx2, sx1rx2, predEvaluator);
 }
}

Notice that the join operator needs three comparators because the format of the two datasets might be different which means the comparator that compares x1 of the left dataset to x2 of the right dataset could be different than the operator that compares x1 of the right dataset to x2 of the left dataset.

The Plane-sweep Join function

Finally, the plane-sweep join function is used in replacement of the nested-loop join function. The function is shown below.
public static void planesweepJoin(IHyracksTaskContext ctx,
    CacheFrameWriter[] datasets, IFrameWriter outputWriter,
    ITuplePairComparator rx1sx1, ITuplePairComparator rx1sx2,
    ITuplePairComparator sx1rx2, IPredicateEvaluator predEvaluator)
    throws HyracksDataException {
  datasets[0].init();
  datasets[1].init();
  FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
  outputWriter.open();
  // The two inputs have been completely read. Do the join
  while (!datasets[0].isEOL() && !datasets[1].isEOL()) {
    // Move the sweep line to the lower of the two current records
    if (rx1sx1.compare(datasets[0].fta, datasets[0].currentRecord,
           datasets[1].fta, datasets[1].currentRecord) < 0) {
      // Sweep line stops at an r record (dataset 0)
      // Current r record is called the active record
      // Scan records in dataset 1 until we pass the right
      // edge of the active record. i.e., while (s.x1 < r.x2)

      // Mark the current position at dataset 1 to return to it later
      datasets[1].mark();

      while (!datasets[1].isEOL() && sx1rx2.compare(
              datasets[1].fta, datasets[1].currentRecord,
              datasets[0].fta, datasets[0].currentRecord) < 0) {
        // Check if r and s overlap
        if (predEvaluator.evaluate(
              datasets[0].fta, datasets[0].currentRecord,
              datasets[1].fta, datasets[1].currentRecord)) {
          // Report this pair to answer
          FrameUtils.appendConcatToWriter(outputWriter, appender,
                  datasets[0].fta, datasets[0].currentRecord,
                  datasets[1].fta, datasets[1].currentRecord);
        }
        // Move to next record in s
        datasets[1].next();
      }

      // Reset to the old position of dataset 1
      datasets[1].reset();
      // Move to the next record in dataset 0
      datasets[0].next();
    } else {
      // Sweep line stops at an s record (dataset 1)
      // Current s record is called the active record
      // Scan records in dataset 0 until we pass the right
      // edge of the active record. i.e., while (r.x1 < s.x2)

      // Mark the current position at dataset 0 to return to it later
      datasets[0].mark();

      while (!datasets[0].isEOL() && rx1sx2.compare(
               datasets[0].fta, datasets[0].currentRecord,
               datasets[1].fta, datasets[1].currentRecord) < 0) {
        // Check if r and s overlap
        if (predEvaluator.evaluate(
               datasets[0].fta, datasets[0].currentRecord,
               datasets[1].fta, datasets[1].currentRecord)) {
          // Report this pair to answer
          FrameUtils.appendConcatToWriter(outputWriter, appender,
                     datasets[0].fta, datasets[0].currentRecord,
                     datasets[1].fta, datasets[1].currentRecord);
        }
        // Move to next record in r
        datasets[0].next();
      }

      // Reset to the old position of dataset 1
      datasets[0].reset();
      // Move to the next record in dataset 0
      datasets[1].next();
    }
  }
  // Finalize output writing
  appender.flush(outputWriter, true);
  outputWriter.close();
}

Benefits of using the comparator interface

Using the comparator interface might not be as straight forward as using the actual records, it has two main benefits. First, it is more efficient as it does not have to instantiate and parse the whole records while all the comparisons are only done on two attributes, x1, and x2. Second, it makes the join algorithm oblivious to the underlying data format which means the one can easily use records of a different format without having to modify the spatial join code. As an example, I added a second test case, found in the download section, which runs the same spatial join technique against rectangles represented with integer coordinates rather than double.

Downloads

You can download the full source code for this blog post here.

1 comment: