Pages

Friday, December 12, 2014

Implementing software occlusion culling for real-time applications

Abstract

The visualization of complex virtual scenes can be significantly accelerated by applying Occlusion Culling. In this work we introduce a variant of the Hierarchical Occlusion Map method to be used in Real-Time applications. To avoid using real objects geometry we generate specialized conservative Occluders based on Axis Aligned Bounding Boxes which are converted into coplanar quads and then rasterized in CPU using a downscaled Depth Buffer. We implement this method in a 3D scene using a software occlusion map rasterizer module specifically optimized to rasterize Occluder quads into a Depth Buffer. We demonstrate that this approach effectively increases the number of occluded objects without generating significant runtime overhead.

View full paper: DOWNLOAD

Screenshots











SQL query on spreadsheet

Introduction

Sometimes I have some Excel spreadsheets and I need to some kind of query on them. Like extracting all unique values from a column, doing a join between two tabs or counting how many occurrences I have for some value.
Most of the time you can combine Excel functions to obtain what you need. There are Excel gurus that can solve almost anything with them.
But as a developer sometimes I'm in a hurry and would prefer to just be able to execute a SQL query on my sheets, avoiding to learn a new function to obtain what I need.
So I created a small Windows tool called ExcelSQL that allows you to load CSV or XLS files and then perform arbitrary SQL queries on them.
You can download it from here: DOWNLOAD

Features

  • Windows app to load CSV and XLS files (no XLSX for the moment) and perform any SQL query on those files.
  • You can do joins, use aggregation functions, sub-queries, and everything that is supported by ODBC Text driver
  • Save results to a file
  • Use results as a new table to join in another query

Examples

Let's suppose we have the following spreadsheet called "products" with a bunch of items and some attributes:

If for example we want to know how many different types of products we have we can open my app and load this file:



The app will show the file in the right panel and will open a new SQL sheet where we can write an arbitrary query. Instead of tables we have files and each file must be specified with a $ at the beginning. In this case: $products.
The app shows below all the rows and columns that satisfy the query we have written. The first row is always considered to be the header and the column names are extracted from it.



Then we can obtain the different types of products executing the following query:


We can also see how many products we have of each type:



More features

Imagine now that we have another spreadsheet called "references" with the description of each type.



We want to add those descriptions to our result. In that case we can also load the other file and execute a query with a JOIN statement:




We can then save this report by clicking on "Save results" or we can also create a temporary table to use these values in another query. To do that we click on "Create table" and a new file will be created containing the current result. Then those results can be used in another query.
For example we may want to list all the products that belongs to a category that has more than 25 products. In that case we can use the previous reports instead of re-computing it again.



Implementation

The application was developed in C# for Windows using Visual Studio 2010 and .NET framework 4.0.

File import

It supports two types of files: CSV and XLS. CSV are copied from their original location to a temporary folder. All files are stored in the same temporary folder.
XLS files are loaded using ODBC with Microsoft.Jet.OleDb driver. The first sheet is identified and then it's queried with a SELECT * sentence. Then its content is saved as CSV file in the same previous temporary  folder.

//Read file and convert to CSV
OleDbConnection conn = new OleDbConnection("Provider=Microsoft.Jet.OleDb.4.0;Data Source=" + srcFilePath + ";Extended Properties=Excel 8.0");
conn.Open();

//Obtain first sheet
string firstSheet = null;
DataTable schemaDt = conn.GetSchema("Tables");
for (int i = 0; i < schemaDt.Rows.Count - 1; i++)
{
 string tableType = schemaDt.Rows[i].ItemArray[schemaDt.Columns.IndexOf("TABLE_TYPE")].ToString();
 string sheetName = schemaDt.Rows[i].ItemArray[schemaDt.Columns.IndexOf("TABLE_NAME")].ToString();
 if (tableType == "TABLE" && sheetName.Contains("$"))
 {
  firstSheet = sheetName;
  break;
 }
}
if (firstSheet == null)
{
 mainForm.showError("Import file error", "Could not find first sheet in excel file: " + srcFilePath);
 return false;
}

//Query sheet
OleDbDataAdapter adapter = new OleDbDataAdapter("SELECT * FROM [" + firstSheet + "]", conn);
OleDbCommandBuilder commandBuilder = new OleDbCommandBuilder(adapter);
DataTable dataTable = new DataTable();
adapter.Fill(dataTable);

//Create CSV file
using (var writer = new StreamWriter(dstFilePath))
{
 //columns
 for (int i = 0; i < dataTable.Columns.Count; i++)
 {
  string colName = dataTable.Columns[i].ColumnName;
  if (i != dataTable.Columns.Count - 1)
  {
   writer.Write(colName + ",");
  }
  else
  {
   writer.WriteLine(colName);
  }
 }
 writer.Flush();

 //rows
 for (int i = 0; i < dataTable.Rows.Count; i++)
 {
  object[] cells = dataTable.Rows[i].ItemArray;
  for (int j = 0; j < cells.Length; j++)
  {
   object value = cells[j];
   string cellValue = value != null ? value.ToString() : "";
   if (j != cells.Length - 1)
   {
    writer.Write(cellValue + ",");
   }
   else
   {
    writer.WriteLine(cellValue);
   }
  }
  writer.Flush();
 }
}

XLSX files cannot be opened with Microsoft.Jet.OleDb driver so they are not supported for now. You have to save them to a CSV or XLS format first.


Queries

CSV files are queried using ODBC with Microsoft Text Driver. All files must be in the same folder in order to perform joins.
We first adapt query to remove the "$" character replace each table with [table_filename]. Then we execute the query and store its result in a DataTable.

OdbcConnection connection = null;
try
{
 //Adapt table names
 queryText = adaptQueryToExecute(queryText, this.mainForm.TableFiles);

 //Open ODBC connection to temp folder
 string connectionStr = @"Driver={Microsoft Text Driver (*.txt; *.csv)};Dbq=" + mainForm.TempTablesPath + ";Extensions=csv,txt";
 connection = new OdbcConnection(connectionStr);
 connection.Open();

 //Execute query
 OdbcDataAdapter odbcAdapter = new OdbcDataAdapter(queryText, connection);

 //Fill dataGrid
 DataTable dataTable = new DataTable();
 odbcAdapter.Fill(dataTable);
 dataGridViewResults.DataSource = dataTable;
 dataGridViewResults.Refresh();
 mainForm.log(dataGridViewResults.Rows.Count + " rows loaded");

 //Close connection
 connection.Close();
}
catch (Exception e)
{
 string error = "Error executing query:\n" + e.Message;
 if (e.InnerException != null)
 {
  error += Environment.NewLine + e.InnerException.Message;
 }
 mainForm.logError(error);
}
finally
{
 if (connection != null)
 {
  connection.Close();
 }
}


Conclusions

If you have to analyse some spreadsheets and don't want to deal with Excel functions then these tool maybe useful for you. Specially if you a developer with strong skills in SQL and almost no knowledge of Excel.
You can download it from here: DOWNLOAD
Some improvements that could be added:

  • ODBC does not support many advanced SQL features and its performance is not very good for large files. It maybe good to replace it with some embedded database, like SQLite or Firebird.
  • Support for XLSX files should also be added.
  • If a XLS files contains many tabs we should be able to load all or some of them instead of the first one.



















Simple Hadoop SQL engine

Introduction

In this post I'm going to show how to write a small and simple SQL engine in Java on top of Hadoop Map Reduce framework. The idea is to have a bunch of files that will be treated as tables and then to execute a SQL query on them.
Similar tools already exist like Apache Hive, Apache Pig, and Cloudera Impala.
So why am I doing my own implementation? If you want to optimize something you normally need to know how does it works internally. One of the best way to learn that is to build the tool yourself. So that's exactly what I'm doing. Besides it's also fun!

Keep in mind that my solution will be much worse and slower and will lack most of the features included in Hive or Impala. But it's also simpler, compact and easy to learn, so it may be a good starting point if you want to understand what's going on behind scenes.

If you just want to jump directly to the code then go here: DOWNLOAD
I'm currently using Hadoop version 2.5.1. No other dependencies.

Example

Let's suppose we are some kind of store and we have a database with 4 tables:
  • sale: main table we want to analyse, contains information of each purchase.
  • user: reference table with information about users.
  • product: reference table with information about products.
  • store: reference table with information about stores.
Sample ERD diagram wit one main table and three reference tables 
For each table you can see the name of each column and also the index (because we are going to use that later).
In this example we only have one main big table: sale, with million of rows and then 3 small tables (thousands of rows) that are and only used to gather reference data.

If we want to know for example how many sales we have for each store we could do something like this:

SELECT sale.store_id, COUNT(sale.product_id) as amount
FROM sale
GROUP BY sale.store_id 
ORDER BY amount DESC

But for this kind of reports we normally want to see descriptions instead of ids. It would be great to show the store name instead of its ID. So we would normally prefer to do:

SELECT store.city, COUNT(sale.product_id) as amount
FROM sale JOIN store ON sale.store_id = store.store_id
GROUP BY sale.store_id
ORDER BY amount DESC

This is the kind of scenario I want to solve with my SQL engine. One main big table to be analysed and many small tables that are going to be used to pick some reference data. It's an enrichment of data for the attributes of the main table.
We could just write a Map-reduce app to generate this report but I wanted to create a more general tool, capable of of solving many (but not all) SQL queries. And of course I wanted that solution to work on top of Hadoop infrastructure in order to parallelize as much as possible.

Features and limitations

So with the previous example in mind I made a list of features I will or will not support in my implementation. Some limitations are due to the kind of specific problem that I want to solve. Some others are just to simplify the syntax of a SQL query because I don't want to spend much time doing a full-feature parser.
  • Only SQL querys, no DML commands like INSERT or UPDATE.
  • Only a limited subset of features is supported for a query: SELECT, FROM, WHERE, GROUP BY and ORDER BY.
  • Join limitations: 
    • Only inner join is supported. The other types of joins are not implemented.
    • There could only be one main big table, the first one specified in FROM.
    • The rest of the tables in FROM are considered to be small enough to be fitted completely in memory (that's one of the biggest limitations of my solution).
    • No cross-join is supported, if you want to use many tables you have to specify how to join those tables.
    • Only one-to-one and many-to-one joins are allowed, no many-to-many or one-to-many. For example store_id is considered to be the primary key of store table so a store_id from sale will only match with one record from store.
    • You can only specify joins between the main table and the rest of the reference tables, no between reference tables. For example you can specify a join between sale and store, or sale and user, but not between store and user. All joins goes to the main table.
  • Syntax limitations:
    • Columns are specified with the table name and the column index (from 0 to N-1):
      • Example: SELECT sale.0 instead of SELECT sale.user_id
      • You always have to specify the table name before the index.
      • Aliases are not supported
      • SELECT * or SELECT table.* is not supported. You have to specify each column.
    • Joins can only be specified inside FROM, not in WHERE:
      • You can do: SELECT ... FROM a JOIN b ON a.x = b.y
      • But you cannot do SELECT ... FROM a, b WHERE a.x = b.y
    • The only supported type for columns is String (no Integer, Float, etc), this implies that:
      • You can only compare against a String. For example: table.column = '51' (even if the value was a number).
      • If you use a comparison operator like > or <= I will be doing a character comparison and not a numerical one.
      • When sorting I'll be doing character comparisons and not numerical ones.
    • No math expressions, like SELECT (column_1 * 2 + column_3) / 5
  • Columns in GROUP BY must be the same than columns in SELECT (ignoring aggregate functions). These means you cannot group by some columns but then show others.
  • Columns in ORDER BY must be specified with an index (from 0 to N-1). Example: ORDER BY 1 DESC
  • Aggregation functions that are supported:
    • COUNT, SUM, AVG, MAX, MIN
    • You cannot do COUNT(*), you always have to specify the column that you want to count. Example: COUNT(sale.1)
  • SQL features not implemented:
    • Having is not supported... yet
    • No LIMIT, TOP, IN, EXISTS, BETWEEN, Sub-selects, DISTINCT, etc.
    • I haven't implemented any typical SQL function like CONCAT, LENGTH, TRIM, etc, and you cannot create your own functions.
    • Tables don't have indexes, primary keys, foreign keys, triggers, constraints, etc.
As you can see... the list of restrictions is pretty large. Many of them could be removed if I had more time to write a proper SQL parser while others will simply not be possible to implement with my current architecture.

The previous query that we used for our example could be re-write for my engine in the following way:

SELECT store.1, COUNT(sale.1)
FROM sale JOIN store ON sale.2 = store.0
GROUP BY store.1
ORDER BY 1 DESC

Architecture


Tables

Tables are assumed to be stored in a similar way than Hive. Each table has a directory in HDFS, Its name should match the table name. Inside that folder there could be one or many HDFS files (their names don't matter). Each file consist of a bunch of lines were each line is a row of the table and each column can be parsed knowing the column delimiter, like a CSV file.
Hive uses something called DataStore to persist metadata about each table, for example the type of each column, size of the table, indices, etc. In my simple architecture I'm not using anything like that. I don't have any metadata for a table and that produces some of the restrictions I mentioned before:
  • I don't know the type of each column so I assume everything is of type String.
  • I don't know the name of each column so you have to use their indices to specify them.
  • I don't know the size of a table so I don't know if it can be fitted entirely in memory. So I just assume the first table specify inside FROM is huge while the following tables are small and be cached in memory.
There is also no CREATE TABLE command. Just create an HDFS directory and move your data there (somehow similar to an external table in Hive).

Strategy

We have two Map-Reduce jobs:
  1. The first one is used for almost everything: joins, where, select and group by:
    • Mapper: perform joins, apply Where filters and pick columns to be used in Select and Group by.
    • Reducer: compute aggregation functions
  2. The second one is used only for sorting (if needed)
The following is a list of the main steps involved to execute a query:
Main components and interactions
  • The SQL query is parsed and validated. Main and reference tables are identified.
  • The main table is set up as the input of the first job mapper.
  • The reference tables are loaded in Hadoop distributed cache.
  • The first job is executed:
    • Mapper: 
      1. It reads each line and converts it to a row with columns.
      2. For each join I search if the join conditions are satisfied between the reference tables and the current row of the main table. If so I append all columns.
      3. I apply WHERE conditions to see if the current joined row satisfies them. If this test failed then I skip the current line.
      4. I extract the columns that I need for SELECT and output them as KEY for the Reducer (one after the other concatenated by a delimiter).
      5. If there is a Group By then I output the columns needed for aggregation functions as VALUES for the Reducer (one after the other concatenated by a delimiter).
    • Reducer:
      1. If I don't require to do Group By then I just output the KEY that we received from the Mapper as the result KEY of the Reducer. I don't care about the value (we just write an empty Text).
      2. If there is a Group By then I proceed to loop through all the values I received in the Reducer for that grouping key. For each value I update the corresponding aggregation function. Then I output the columns we received in the KEY from the Mapper plus the results of the aggregation functions (in the proper order). As in the previous case, I don't care about the value I output.
  • If I need to do ORDER BY then I execute the second job.:
    • The Mapper just reads each line and outputs them using the same KEY (a fixed value). All columns are outputted as VALUES for the Reducer (one after the other concatenated by a delimiter).
    • The Reducer receives all the rows (because I am using only the same key for all of them), saves all of them in memory and then performs an in-memory sorting (if you don' have enough memory it blows up).

Implementation


SQL parsing

The string representing the SQL sentence is parsed using a custom made 100% hard-coded SQL parser. There is nothing new to learn here and it's definitely not the proper way to do it if you want to cover all SQL features.

//Create config
Configuration mainConf = new Configuration();

//Parse sql
SqlParser sqlParser = new SqlParser(sql, true);

//Store original sql string as a config parameter
mainConf.set("sql", sql);

You can take a look to the parser here.
The string is parsed and stored in internal objects that are easy to interact with. This is done in the main class where jobs are being assembled.
Class diagram for classes used to store SQL components after parsing has been performed

These objects are also needed inside the mappers and reducers. The correct thing to do would be to serialize the internal objects (for example using a plain JSON string) and then send that to the mappers and reducers as a configuration parameter.
However instead of doing that I'm just passing the original SQL string to each mapper and reducer and then I proceed to do the parsing again (without running validations). It's just simpler to parse all again than dealing with serialization (although it's slower).

First job setup

I create a new Map-Reduce job:

//Create config
Configuration mainConf = new Configuration();

//Create job
Job mainJob = Job.getInstance(mainConf, "SqlEngine-Main");

//Set Map and Reduce
mainJob.setOutputKeyClass(Text.class);
mainJob.setOutputValueClass(Text.class);
mainJob.setJarByClass(SqlEngine.class);
mainJob.setMapperClass(SqlEngineMapper.class);
mainJob.setReducerClass(SqlEngineReducer.class);

I then specify the main table as the input of this job:

//Add main table to input
Path mainTablePath = new Path(basePath + "/" + sqlParser.mainTable);
FileInputFormat.addInputPath(mainJob, mainTablePath);

Then I add all reference tables to the distributed cache:

//Add secondary tables to distributed cache
for (SqlParser.Join join : sqlParser.joins) {
 
 //Path to table dir
 Path joinTableDir = new Path(basePath + "/" + join.table + "/");

 //Check how many files we have inside that dir
 FileStatus[] fileStatus = hdfs.listStatus(joinTableDir);
 if(fileStatus.length == 0) {
  //error, table is empty

 } else if(fileStatus.length == 1) {
  //Add file to distributed cache
  mainJob.addCacheFile(fileStatus[0].getPath().toUri());

 } else {
  //There are many files, merge them all in one temp file
  Path tmpMergeFile = new Path(tmpDir + "/" + sqlParser.joins + ".tmp");
  Utils.mergeFiles(hdfs, joinTableDir, tmpFile);
  mainJob.addCacheFile(tmpFile.toUri());
 }
}

The directory for a specific table may contain more than one file. In that case I have to load them all in the distributed cache and then treat them as only one table in the Mapper. To simplify that part I'm just creating a temporary file where I merge all the files into one.

Finally the first job is executed:

//Execute
mainJob.waitForCompletion(true);

Mapper

The first Job Mapper will receive the following data as input:
  • Line by line of the main table files.
  • Reference tables in distributed cache.
  • SQL parsed structure: tables that we need, columns, joins, etc.
The output of the Mapper is:
  • Key: Text, will contain the value of the columns specified in SELECT and GROUP BY separated by a delimiter: column1,column2,colum3
  • Value: Text, if we don't require to do GROUP BY then it will be an empty String. Otherwise I'll be outputting the values of the columns specified in aggregated functions used in SELECT (more on that later). The content is also separated by a delimiter: column1,column2,colum3

Join

Joins are performed in the Mapper as its first step. The idea is to see if the current row of the main table match any of the columns of the reference tables I have in memory.
So first the reference tables that I put in the distributed cache must be loaded into memory during the Mapper setup phase:

public void setup(Context context) throws IOException, InterruptedException {    
 //Load tables in memory
 FileSystem hdfs = FileSystem.get(context.getConfiguration());
 tables = new ArrayList<Cachedtable>();
 if(context.getCacheFiles() != null) {
  for (URI uri : context.getCacheFiles()) {
   tables.add(new CachedTable(hdfs, new Path(uri), columnSeparator));
  }
 }

 super.setup(context);
}

CacheTable is just a helper class to read a file from HDFS and store each line as a list of rows in memory:

public class CachedTable {

 String table;
 List<Record> rows;

 public CachedTable(FileSystem hdfs, Path path, String sep) {
 rows = new ArrayList<Record>();

 //Obtain table name
 table = path.getName();
 table = table.substring(0, table.lastIndexOf('.'));

 //Reader HDFS file
 BufferedReader reader = null;
 try {
  reader = new BufferedReader(new InputStreamReader(hdfs.open(path)));
  String line;

  //Store each line in memory as an instance of Record
  while((line = reader.readLine()) != null) {
   rows.add(new Record(line, sep));
  }
  } catch (IOException e) {
   throw new RuntimeException("Error loading table in memory: " + path, e);
  } finally {
   try {
    if(reader != null) {
     reader.close();
    }
   } catch (IOException e) {
   }
  }
 }
}

Record is another helper class to split a line and stores its columns:

public class Record {
 int columnsCount;
 String[] columns;

 public Record(String line, String sep) {
  String[] split = line.split(sep);
  columnsCount = split.length;
  columns = new String[columnsCount];
  for (int i = 0; i < split.length; i++) {
   columns[i] = split[i].trim();
  }
 }
}

With all that I have to see if the current row satisfy its joins conditions with any row of the reference tables.
For example if I have:

FROM sale JOIN store ON sale.2 = store.0

I have to loop through the entire store table that we have in memory and test if column index 2 for any of those rows is equals to column index 0 of the current row.

Perform a JOIN between the main table and a reference table
If there is no row in store that satisfy the join condition then the Mapper skip the current row. If a match is found I pick both rows, the current row and the one found in the reference table, and I added to a Map. All those are the columns that I am going to use to perform WHERE, SELECT and GROUP BY.

Map storing the two rows that have been joined 
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
 //Current row of the main table
 Record currentRow = new Record(value.toString(), columnSeparator);

 //Joins: search in cached tables and merge records into one
 Map<String, Record> joinedRows = new HashMap<String, Record>();
 joinedRows.put(sql.mainTable, currentRow);
 for (Join join : sql.joins) {
  CachedTable joinTable = getTable(join.table);
  for (JoinClause joinClause : join.joinClauses) {
   String currentRowValue = currentRow.getValue(joinClause.localColumn);

   //Perform a full table search by that column
   Record joinRow = joinTable.searchByColumn(joinClause.foreignColumn.column, currentRowValue);

   //Join not satisfied, skip the current row
   if(joinRow == null) {
    return;
   }
   joinedRows.put(join.table, joinRow);
  }
 }
}

public class CachedTable {
 //Full table scan
 public Record searchByColumn(int column, String value) {
  for (Record row : rows) {
   if(row.getValue(column).equals(value))
   return row;
  }
  return null;
 }
}


Where

Where filters are also applied in the Mapper. They are done after joins. The Map with joined records is used to pick up the columns that we need to test in WHERE conditions.
There is nothing special in this. I analyse each condition if they are not satisfied we skip the current row.
These tests are performed recursively. I have a two kinds of WHERE conditions:

  • Simple condition: =, !=, >, >=, <, <= and LIKE.
  • Compose condition: made of two conditions (simple or composed) and a conditional operator (AND, OR)

It's similar to Composite design pattern.
We also perform early-out if the first condition in a compose condition is not satisfied and we have an AND operator.

//Where: apply filters
if(sql.whereClause != null) {

 //Apply nested filters recursively
 boolean result = applyWhereClauseRecursive(joinedRows, sql.whereClause);

 //Skip current line if the filters were not satisfied
 if(!result) {
  return;
 }
}

private boolean applyWhereClauseRecursive(Map<String, Record> joinedRows, WhereClause clause) {
 //Compound expression
 if(clause instanceof WhereCompoundClause) {
   WhereCompoundClause compClause = (WhereCompoundClause)clause;

   //Apply left clause first
   boolean leftResult = applyWhereClauseRecursive(joinedRows, compClause.leftClause);
   //Early abort if using an AND operator
   if(!leftResult && compClause.operator == ConditionalOperator.AND)
     return false;

   //Apply right clause
   boolean rightResult = applyWhereClauseRecursive(joinedRows, compClause.rightClause);

   //Apply conditional operator
   boolean result;
   if(compClause.operator == ConditionalOperator.AND) {
    result = leftResult && rightResult;
   } else {
    result = leftResult || rightResult;
   }
   return result;

 //Final expression
 } else {
  //Get value to compare
  WhereSimpleClause simpleClause = (WhereSimpleClause)clause;
  String colValue = joinedRows.get(simpleClause.column.table).getValue(simpleClause.column.column);

  //Apply operator
  boolean result = false;
  switch (simpleClause.operator) {
   case EQUALS:
    result = colValue.toLowerCase().equals(simpleClause.value);
    break;
   case NOT_EQUALS:
    result = !colValue.equals(simpleClause.value);
    break;
   case LIKE:
    result = colValue.toLowerCase().contains(simpleClause.value);
    break;
   case GREATER:
    result = colValue.compareTo(simpleClause.value) > 0;
    break;
   case GREATER_EQ:
    result = colValue.compareTo(simpleClause.value) >= 0;
    break;
   case LOWER:
    result = colValue.compareTo(simpleClause.value) < 0;
    break;
   case LOWER_EQ:
    result = colValue.compareTo(simpleClause.value) <= 0;
    break;
  }

  return result;
 }
}

Select

SELECT is performed in the Mapper after WHERE.
I loop through the columns that have been specified in the SELECT part of the SQL sentence and I pick the value of those columns from the Map with joined records.
I output those values as the Reducer KEY, separated by a delimiter.

//Select: leave only the columns that we want to see. Put them in reducer key
List<String> keyColumns = new ArrayList<String>();
for (SelectColumn selectColumn : sql.selectColumns) {
 //Constant: just add the value
 if(selectColumn.type == SelectColumnType.CONSTANT) {
  keyColumns.add(selectColumn.constant);
  
 //Regular column: add column value
 } else if(selectColumn.type == SelectColumnType.COLUMN) {
  String colValue = joinedRows.get(selectColumn.column.table).getValue(selectColumn.column.column);
  keyColumns.add(colValue);
 }
}

Group by

I put the restriction that Group By columns should match Select columns (except for aggregation functions) so grouping has already been solved in the previous step. All values with the same key will go to the same Reducer and that way I can compute there the aggregation functions.
The only thing that remains to do in the Mapper is to output the values that we need in the Reducer for the aggregate functions. I output those columns in the VALUE for the Reducer, separated by a delimiter:

//Aggregation columns: put them in reducer value
List<String> valueColumns = new ArrayList<String>();
for (SelectColumn selectColumn : sql.selectColumns) {
 if(selectColumn.type == SelectColumnType.AGGREGATE) {
  String colValue = joinedRows.get(selectColumn.column.table).getValue(selectColumn.column.column);
  valueColumns.add(colValue);
 }
}

Reducer

The Reducer receives the following as input:

  • Key: the Group By or Select columns I outputted in the Mapper
  • Values: the value of each column that we need to use compute aggregation functions.
The output will be:
  • Key: all the columns that the user wants to see (separated by a delimiter)
  • Value: an empty String (I don't care about the value)
If the SQL sentence doesn't require Group By then I can just output what I received as the key. Otherwise I have to compute aggregation functions. I loop through each aggregation function specified in SELECT and update the value of the required calculation:


//Parse grouping columns from key
Record groupColumns = new Record(key.toString(), columnSeparator);

//Init all aggregate values
int count = 0;
double sum = 0;
double max = Double.MIN_VALUE;
double min = Double.MAX_VALUE;

//Loop through each value that we have to aggregate
for (Text aggregateItem : values) {
 Record aggregateColums = new Record(aggregateItem.toString(), columnSeparator);
 
 //Loop trough aggregate columns
 int aggIndex = 0;
 for (SelectColumn c : sql.selectColumns) {
  if(c.type == SelectColumnType.AGGREGATE) {
   
   String colValue = aggregateColums.getValue(aggIndex);
   
   //Count
   if(c.function == AggregateFunction.COUNT || c.function == AggregateFunction.AVG) {
    count++;
   }
   
   //Sum
   if(c.function == AggregateFunction.SUM || c.function == AggregateFunction.AVG) {
    double v = Double.parseDouble(colValue);
    sum += v;
   }
   
   //Max and Min
   if(c.function == AggregateFunction.MAX || c.function == AggregateFunction.MIN) {
    double v = Double.parseDouble(colValue);
    if(v > max) max = v;
    if(v < min) min = v;
   }
   
   aggIndex++;
  }
 }
}

The last step is to output the columns that the user wants to see, including results from aggregation functions, and in the proper order:

//Generate final list of columns
List<String> resultColumns = new ArrayList<String>(sql.selectColumns.size());
int groupColIndex = 0;
for (SelectColumn c : sql.selectColumns) {
 
 //Add constant or column value
 if(c.type == SelectColumnType.CONSTANT || c.type == SelectColumnType.COLUMN) {
  resultColumns.add(groupColumns.getValue(groupColIndex));
  groupColIndex++;
  
 //Add aggregate calculation
 } else if(c.type == SelectColumnType.AGGREGATE) {
  switch (c.function) {
  case COUNT:
   resultColumns.add(String.valueOf(count));
   break;
  case SUM:
   resultColumns.add(Utils.printDouble(sum));
   break;
  case MAX:
   resultColumns.add(Utils.printDouble(max));
   break;
  case MIN:
   resultColumns.add(Utils.printDouble(min));
   break;
  case AVG:
   double avg = sum / (double)count;
   resultColumns.add(Utils.printDouble(avg));
   break;
  }
 }
}

outputWritable.set(getColumsOutput(resultColumns, columnSeparator));

Order by

To perform Order By I use a second job. This job reads the output generated by the previous job and proceed to do sorting.
The Mapper doesn't do anything important (just an identity mapper). The Reducer receives all rows and perform an in-memory sorting:

public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
 //Load all rows in memory
 List<Record> rows = new ArrayList<Record>();
 for (Text text : values) {
  rows.add(new Record(text.toString(), columnSeparator));
 }
 
 //Sort rows based on order by columns
 Collections.sort(rows, new Comparator<Record>() {
  @Override
  public int compare(Record a, Record b) {
   int result = 0;
   for (SortClause sortClause : sql.sortClauses) {
    String value1 = a.getValue(sortClause.index);
    String value2 = b.getValue(sortClause.index);
    
    result = value1.compareTo(value2);
    if(result != 0) {
     result = sortClause.order == SortOrder.ASC ? result : -result;
     return result;
    }
   }
   return result;
  }
 });
 
 //Output sorted rows
 for (Record row : rows) {
  outputWritable.set(row.toString());
  context.write(outputWritable, empty);
 }
}

It must be noted that in order for the previous solution to work properly only one Reducer must be used. That must be specified when creating the job:

sortJob.setNumReduceTasks(1);

This solution will of course not work if the number of rows to sort cannot be fitted into memory. A better approach would be to use a custom partitioner and divide the load into chunks.

Conclusion

The full source code can be downloaded from here: DOWNLOAD

There is a still a lot to improve in the current implementation. More SQL features need to be implemented in order to be a general solution useful for many situations.
The restriction of having to load all the join tables in memory is too strong for many cases. Alternative join mechanism should be provided.
Also Order By should be re-implemented to support arbitrary large results.
However the insight learnt doing my own implementation was very useful for me to understand how to performance tunning on existing solutions, like Hive, Pig and Impala.

References