Monday, November 23, 2015

Your first Hyracks operator

In a previous blog post, I introduced Hyracks operators and briefly described how they work. In this blog post, I'll show how to create and use a very simple Hyracks operator that performs spatial join using a naive nested loop algorithm. Although there is already an existing nested loop join operator in AsterixDB, I provide a simpler, probably less efficient, implementation for the sake of demonstration. This blog post will focus on the steps needed to create a Hyracks operator while future blog posts will show how to improve this operator to be more efficient.

Overview

The above figure gives an overview of the new operator. Notice that I'm calling it PlaneSweep operator although it currently applies a nested loop algorithm because I'm planning to change the implementation in the future. For simplicity, I use only one activity node that performs the join. The two inputs are directly connected to this activity node and it directly writes to the final output.

Operator Descriptor

The main entry point to any Hyracks operator is the OperatorDescriptor which constructs the activity node(s) and connects them to the inputs and outputs. In this example, I'll directly extend the AbstractOperatorDescriptor. However, there are other abstract operator descriptors that can be used as templates, such as the AbstractSingleActivityOperatorDescriptor which would also work in this example. The class definition looks like this:

public class PlaneSweepJoinOperatorDescriptor
  extends AbstractOperatorDescriptor

The constructor of the new operator takes as input the descriptors of the inputs and outputs, which are needed to correctly parse the input data frames, and correctly write the output data frames. In addition, it takes a predicate evaluator that is used to test if two input records satisfy the spatial predicate (will be described soon).
public PlaneSweepJoinOperatorDescriptor(
   IOperatorDescriptorRegistry spec,
   RecordDescriptor outputDescriptor,
   IPredicateEvaluator predEvaluator) {
  super(spec, 2, 1); // Two inputs, one output
  this.recordDescriptors[0] = outputDescriptor; // output descriptor
  this.predEvaluator = predEvaluator; // Spatial join predicate
}
The primary method in the operator descriptor is contributeActivities, which actually creates the activities and connects them to inputs and outputs. As described above, we currently create only one activity that takes two inputs and produces one output.
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
  ActivityId joinId = new ActivityId(getOperatorId(), JOIN_ACTIVITY_ID);
  PlaneSweepJoinActivityNode joinActivity =
      new PlaneSweepJoinActivityNode(joinId, predEvaluator);

  // Add this activity node
  builder.addActivity(this, joinActivity);
  // Connect to first input
  builder.addSourceEdge(0, joinActivity, 0);
  // Connect to second input
  builder.addSourceEdge(1, joinActivity, 1);
  // Connect to output
  builder.addTargetEdge(0, joinActivity, 0);
}
The job of the operator descriptor ends by defining the activity nodes. All the remaining work is done by this activity node which will be described next.

Activity Node

The activity node is the class that deals with input frames and produces output frames. It implements the interface IActivity which contains mainly one method, createPushRuntime. This method creates an instance of type IOperatorNodePushable to which data frames get pushed.

Operator Node Pushable

The operator node pushable is created by the activity node and passed to the Hyracks engine which uses it to push input data frames. It also pushes the output data frames to the next operator or activity node. It has primarily two methods that need to be implemented, getInputFrameWriter and setOutputFrameWriter. Both of them deal with the interface IFrameWriter which is a push interface that gets notified whenever there is a data frame available for processing. The input frame writers, which are created by the OperatorNodePushable, get notified whenever there is an input frame available. On the other hand, the output frame writer is created outside the OperatorNodePushable and it pushes output data frames whenever one is ready.

Input Frame Writers

For the input frame writers, I use a very simple class that caches all data frames in memory. Of course it would fail if the data is too much for main memory, but I use it only for demonstration.
public class CacheFrameWriter
      implements IFrameWriter {
  /** All cached frames */
  protected List<ByteBuffer> cachedFrames;
  /** Hyracks context of the running job */
  private IHyracksTaskContext ctx;
  /** The CacheFrameWriter notifies this when the it is closed */
  private Notifiable notifiable;

  public CacheFrameWriter(IHyracksTaskContext ctx,
         Notifiable notifiable) {
    this.ctx = ctx;
    this.notifiable = notifiable;
  }

  @Override
  public void open() throws HyracksDataException {
    // Initialize the in-memory store that will be used to store frames
    cachedFrames = new ArrayList<ByteBuffer>();
  }

  @Override
  public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
    // Store this buffer in memory for later use
    ByteBuffer copyBuffer = ctx.allocateFrame(buffer.capacity());
    FrameUtils.copyAndFlip(buffer, copyBuffer);
    cachedFrames.add(copyBuffer);
  }

  @Override
  public void fail() throws HyracksDataException {
    cachedFrames = null; // To prevent further insertions
  }

  @Override
  public void close() throws HyracksDataException {
    // Notify its creator that it has been closed
    notifiable.notify(this);
  }
}

To read the input frames, we create two CacheFrameWriters as one per input dataset. The following code snippet is part of the method PlaneSweepJoinActivityNode#createPushRuntime

// One CacheFrameWriter per input
inputDatasets = new CacheFrameWriter[inputDatasets.length];
for (int i = 0; i < inputDatasets.length; i++)
  inputDatasets[i] = new CacheFrameWriter(ctx, this);

After each of the two inputs frame writers are closed, meaning that all the input frames have been read, it notifies the PlaneSweepJoinActivityNode through a notify method that we added. When this method is called twice, it knows that all the inputs have been read and starts the join process as shown below.
@Override
public void notify(Object notified) throws HyracksDataException {
  // A notification that one of the inputs has been completely read
  numInputsComplete++;
  if (numInputsComplete == 2) {
    // The two inputs have been completely read. Do the join
    FrameTupleAccessor accessor0 = new FrameTupleAccessor(rd0);
    FrameTupleAccessor accessor1 = new FrameTupleAccessor(rd1);
    FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
    outputWriter.open();
    // XXX run a nest loop join, for now
    for (ByteBuffer data0 : inputDatasets[0].cachedFrames) {
      for (ByteBuffer data1 : inputDatasets[1].cachedFrames) {
        // Compare records in frame1 with records in frame2
        accessor0.reset(data0);
        accessor1.reset(data1);

        for (int i0 = 0; i0 < accessor0.getTupleCount(); i0++) {
          for (int i1 = 0; i1 < accessor1.getTupleCount(); i1++) {
            if (predEvaluator.evaluate(accessor0, i0, accessor1, i1)) {
              // Found a matching pair, write them to the output
              FrameUtils.appendConcatToWriter(outputWriter, appender,
                  accessor0, i0, accessor1, i1);
              System.out.println("Found a matching pair");
            }
          }
        }
      }
    }
    appender.flush(outputWriter, true);
    outputWriter.close();
  }
}
Notice that this method uses the IPredicateEvaluator interface to check whether two records satisfy the predicate or not. The predEvaluator object is supplied to the plane sweep operator and might change according to the format of the input records and the actual predicate being evaluated. This makes the spatial join operator agnostic to the underlying data format. An example predicate evaluator is shown next.

Predicate Evaluator

A predicate evaluator implements the interface IPredicateEvaluator interface which contains a single evaluate method which takes two records and returns either true or false for whether they satisfy the predicate or not. The following sample implementation tests the spatial overlap predicate between two records of the format (id, x1, y1, x2, y2).
public static class SpatialOverlapPredicate
implements IPredicateEvaluator, Serializable {
  @Override
  public boolean evaluate(IFrameTupleAccessor fta0, int tupId0,
                          IFrameTupleAccessor fta1, int tupId1) {
    // Read the coordinates of the first rectangle
    ByteBuffer buf = fta0.getBuffer();
    double r0_x1 = buf.getDouble(fta0.getAbsoluteFieldStartOffset(tupId0, 1));
    double r0_y1 = buf.getDouble(fta0.getAbsoluteFieldStartOffset(tupId0, 2));
    double r0_x2 = buf.getDouble(fta0.getAbsoluteFieldStartOffset(tupId0, 3));
    double r0_y2 = buf.getDouble(fta0.getAbsoluteFieldStartOffset(tupId0, 4));
    // Read the coordinates of the second rectangle
    buf = fta1.getBuffer();
    double r1_x1 = buf.getDouble(fta1.getAbsoluteFieldStartOffset(tupId1, 1));
    double r1_y1 = buf.getDouble(fta1.getAbsoluteFieldStartOffset(tupId1, 2));
    double r1_x2 = buf.getDouble(fta1.getAbsoluteFieldStartOffset(tupId1, 3));
    double r1_y2 = buf.getDouble(fta1.getAbsoluteFieldStartOffset(tupId1, 4));

    // Evaluate the overlap of the two rectangles
    return r0_x2 > r1_x1 && r1_x2 > r0_x1 &&
           r0_y2 > r1_y1 && r1_y2 > r0_y1;
  }
}

Testing

The last part is to actually test the spatial join operator. I created a simple test case with two input datasets of rectangles. To create the whole Hyracks job, we need to specify the input format and the output format. All job parameters are specified in an object of type JobSpecification, which is similar to the Job or JobConf in Hadoop.
JobSpecification spec = new JobSpecification();
The first input is defined using the following code snippet.
// Define first input file
FileSplit[] rect1Splits = new FileSplit[] {
  new FileSplit(NC1_ID,
  new FileReference(new File("data/spatial/rects1.csv"))) };
IFileSplitProvider rect1SplitsProvider =
  new ConstantFileSplitProvider(rect1Splits);
RecordDescriptor rect1Desc = new RecordDescriptor(
  new ISerializerDeserializer[] {
      IntegerSerializerDeserializer.INSTANCE,
      DoubleSerializerDeserializer.INSTANCE,
      DoubleSerializerDeserializer.INSTANCE,
      DoubleSerializerDeserializer.INSTANCE,
      DoubleSerializerDeserializer.INSTANCE });
FileScanOperatorDescriptor rect1Scanner =
      new FileScanOperatorDescriptor(spec, rect1SplitsProvider,
      new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
            IntegerParserFactory.INSTANCE,
            DoubleParserFactory.INSTANCE,
            DoubleParserFactory.INSTANCE,
            DoubleParserFactory.INSTANCE,
            DoubleParserFactory.INSTANCE }, ','),
      rect1Desc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, rect1Scanner, NC1_ID);
The FileSplit array defines how to read the input. The FileScanOperatorDescriptor defines how to parse the input file and transforms it into in-memory binary format. The RecordDescriptor defines how to directly parse and process the in-memory binary format without having to transform it into a Java class. An exactly similar code is used to define the second input file.
For the output, we need first to define how the records are stored in-memory which is the only thing needed for the plane sweep operator to work.
RecordDescriptor outputDesc =
  new RecordDescriptor(new ISerializerDeserializer[] {
    IntegerSerializerDeserializer.INSTANCE,
    DoubleSerializerDeserializer.INSTANCE,
    DoubleSerializerDeserializer.INSTANCE,
    DoubleSerializerDeserializer.INSTANCE,
    DoubleSerializerDeserializer.INSTANCE,
    IntegerSerializerDeserializer.INSTANCE,
    DoubleSerializerDeserializer.INSTANCE,
    DoubleSerializerDeserializer.INSTANCE,
    DoubleSerializerDeserializer.INSTANCE,
    DoubleSerializerDeserializer.INSTANCE });
After that, the operator is initialized and assigned to a virtual running node NC1_ID.
PlaneSweepJoinOperatorDescriptor join =
    new PlaneSweepJoinOperatorDescriptor(spec, outputDesc,
    new SpatialOverlapPredicate());
PartitionConstraintHelper.
  addAbsoluteLocationConstraint(spec, join, NC1_ID);
After that, the output file is defined to tell Hyracks how to write the final result to a file.
ResultSetId rsId = new ResultSetId(1);
spec.addResultSetId(rsId);
IOperatorDescriptor printer =
    new ResultWriterOperatorDescriptor(spec, rsId, false, false,
    ResultSerializerFactoryProvider.INSTANCE.
      getResultSerializerFactoryProvider());

PartitionConstraintHelper.
  addAbsoluteLocationConstraint(spec, printer, NC1_ID);
Finally, the input sources are connected to the plane sweep operator, which is then connected to the final output.
// Connect the two inputs
IConnectorDescriptor input1Conn = new OneToOneConnectorDescriptor(spec);
spec.connect(input1Conn, rect1Scanner, 0, join, 0);

IConnectorDescriptor input2Conn = new OneToOneConnectorDescriptor(spec);
spec.connect(input2Conn, rect2Scanner, 0, join, 1);

// Connect the output
IConnectorDescriptor outputConn = new OneToOneConnectorDescriptor(spec);
spec.connect(outputConn, join, 0, printer, 0);

spec.addRoot(printer);

Now the job is ready to run using the following line.
runTestAndStoreResult(spec, new File("sj_test_output"));
Currently, the output is just written to a text file which I manually check for correctness.

Downloads

All source code for this tutorial can be found at
https://sites.google.com/site/aseldawyblogfiles/files
This includes a simple test case and its expected output.

1 comment: