Friday, November 20, 2015

An Introduction to Hyracks Operators in AsterixDB

I had the opportunity to collaborate with the AsterixDB team led by Mike Carey and Chen Li in University of California, Irvine. The main objective of this collaboration is to introduce efficient ad-hoc spatial join query processing in AsterixDB. In this blog post, I will try to summarize my work for future developers and users of AsterixDB. I believe that this post could be very helpful as it is coming from an outsider. So, I will try to make it simple and will not assume and previous knowledge of AsterixDB. In fact, this is my first real interaction with AsterixDB system internals.

The Problem

I'm trying to solve the spatial join problem where the input is two tables, R and S, and a spatial predicate θ such as overlaps or touches. The output is every pair of records (r, s), where r∈R, s∈S, and θ is true for the pair (r,s). A naive approach is to perform a cross product of the two tables and test the predicate against every pair. However, our objective is to provide a more efficient algorithm.

The Solution

Our plan is to implement the partition-based spatial merge (PBSM) join algorithm as described in "Jignesh M. Patel and David J. DeWitt. Partition based spatial-merge join. In Proceedings of the 1996 ACM SIGMOD international conference on Management of data (SIGMOD '96)". In this blog post, we will focus on the implementation of this algorithm in AsterixDB. I'll assume that you already understand this PBSM algorithm. I might right a separate blog post that describes the PBSM algorithm, or I'll be happy to link to an existing page that describes it well.

Blog Series

In a series of blog posts, I'll try to summarize the experience that I got while working on AsterixDB. I'll write these blog posts as I'm working on the project so I don't miss any details. I thought of waiting until I get the whole experience and then write about it, but I thought that writing along the way could be more beneficial as you'll be able to see the steps I took to reach the final product. In this blog post, I'll describe the basics of Hyracks operators. In future blog posts, I'll describe the implementation of the spatial join operator. I'll start with a very simple and basic implementation, then I'll keep updating it to get a more efficient implementation.

Hyracks Operators

Hyracks is the query processing engine of AsterixDB. Think of it as the MapReduce programming paradigm for Hadoop, or the Resilient Distributed Dataset (RDD) for Spark. It uses a directed acyclic graph (DAG) model which is very popular in many big data query processing engines including Apache Pig and Apache Spark. A typical operator can have either one or two inputs and one output. While this design looks simple, the implementation is made a little bit complex to ensure high efficiency and low memory footprint.

Example: Nested Loop Join

We show the internal design of the Hyracks operator by an example of NestedLoopJoinOperator. This is a simple operator that is also closely related to our target operator. The operator is shown in figure below.
As shown in the figure above, the operator is internally broken into smaller components, called ActivityNodes. An activity node is the smallest piece of work that is done on a stream of data. Activity Nodes are connected with Edges. These edges define how activity nodes interact with each other. As the nested loop join operator is a binary operator, it has two source (input) edges connected to two activity nodes, JoinCacheActivityNode and NestedLoopJoinActivityNode. JoinCacheActivityNode reads all records in one input and stores (caches) them for a further use. The BlockingEdge ensures that the JoinCacheActivityNode performs all of its work (i.e., reads all its input) before the NestedLoopJoinActivityNode starts working. After JoinCacheActivityNode finishes its work, the NestedLoopJoinActivityNode starts by taking each record of the second input and compares it against all records of the first input, which has been cached. Finally, the matching pairs are connected to the output through the TargetEdge.

Data Frames

In Hyracks, all data processing happen over DataFrames which are simply blocks of binary data. No actual data objects are ever created automatically by Hyracks. While this complicates the design and the implementation of the activity nodes and operators, it has two main advantages.
  1. It is very memory light weight as compared to Java objects which consumes a lot of memory in meta data including object IDs and memory references.
  2. It is usually much faster as all the processing is done over the raw binary data. An operator needs only to parse the parts of the data that it needs to process rather than the whole record.
So, as the records are read from input files, they are parsed and kept in raw binary format. Multiple records can share the same data frame for memory and processing efficiency. However, a record cannot span multiple frames. Each activity node receives a bunch of data frames and it is up to its implementation to decide what to do with these frames.

Helper Classes

You might ask: How do I deal with these binary data if I don't know how to parse them. That's a good question. AsterixDB does not leave you completely in the wild. It provides some helper functions that understand the underlying format and, in some cases, can directly process records in their binary formats. While there are many of them, I'm going to describe only four classes which I dealt with, namely, ISerializerDeserializer, RecordDescriptor, IFrameTupleAccessor, and IPredicateEvaluator. These should give you a high level idea of how things work.

ISerializerDeserializer

ISerializerDeserializer defines how to parse and store a single attribute. For example, it defines how to convert a double value into eight bytes, and how to read these eight bytes back into the double value.

RecordDescriptor

RecordDescriptors store metadata about the record as a whole. This includes the serializer/deserializer of each attribute as well as the length of each attribute, if it's a fixed size.

IFrameTupleAccessor

The tuple accessor uses the RecordDescriptor to actually parse and access individual records in a data frame. With this class, you can iterate over records, retrieve the data of each record, or a specific attribute in a record.

IPredicateEvaluator

The predicate evaluator takes two records and tests if they satisfy some predicate. I'll show in the next blog post how to use it to evaluate the spatial overlap predicate.

1 comment:

  1. It's good to read an article sometimes. It's nice that you describe it.
    Find deals with hotels

    ReplyDelete