Introduction
In this post I'm going to show how to write a small and simple SQL engine in Java on top of Hadoop Map Reduce framework. The idea is to have a bunch of files that will be treated as tables and then to execute a SQL query on them.
Similar tools already exist like Apache Hive, Apache Pig, and Cloudera Impala.
So why am I doing my own implementation? If you want to optimize something you normally need to know how does it works internally. One of the best way to learn that is to build the tool yourself. So that's exactly what I'm doing. Besides it's also fun!
Keep in mind that my solution will be much worse and slower and will lack most of the features included in Hive or Impala. But it's also simpler, compact and easy to learn, so it may be a good starting point if you want to understand what's going on behind scenes.
If you just want to jump directly to the code then go here:
DOWNLOAD
I'm currently using Hadoop version 2.5.1. No other dependencies.
Example
Let's suppose we are some kind of store and we have a database with 4 tables:
- sale: main table we want to analyse, contains information of each purchase.
- user: reference table with information about users.
- product: reference table with information about products.
- store: reference table with information about stores.
 |
| Sample ERD diagram wit one main table and three reference tables |
For each table you can see the name of each column and also the index (because we are going to use that later).
In this example we only have one main big table: sale, with million of rows and then 3 small tables (thousands of rows) that are and only used to gather reference data.
If we want to know for example how many sales we have for each store we could do something like this:
SELECT sale.store_id, COUNT(sale.product_id) as amount
FROM sale
GROUP BY sale.store_id
ORDER BY amount DESC
But for this kind of reports we normally want to see descriptions instead of ids. It would be great to show the store name instead of its ID. So we would normally prefer to do:
SELECT store.city, COUNT(sale.product_id) as amount
FROM sale JOIN store ON sale.store_id = store.store_id
GROUP BY sale.store_id
ORDER BY amount DESC
This is the kind of scenario I want to solve with my SQL engine. One main big table to be analysed and many small tables that are going to be used to pick some reference data. It's an enrichment of data for the attributes of the main table.
We could just write a Map-reduce app to generate this report but I wanted to create a more general tool, capable of of solving many (but not all) SQL queries. And of course I wanted that solution to work on top of Hadoop infrastructure in order to parallelize as much as possible.
Features and limitations
So with the previous example in mind I made a list of features I will or will not support in my implementation. Some limitations are due to the kind of specific problem that I want to solve. Some others are just to simplify the syntax of a SQL query because I don't want to spend much time doing a full-feature parser.
- Only SQL querys, no DML commands like INSERT or UPDATE.
- Only a limited subset of features is supported for a query: SELECT, FROM, WHERE, GROUP BY and ORDER BY.
- Join limitations:
- Only inner join is supported. The other types of joins are not implemented.
- There could only be one main big table, the first one specified in FROM.
- The rest of the tables in FROM are considered to be small enough to be fitted completely in memory (that's one of the biggest limitations of my solution).
- No cross-join is supported, if you want to use many tables you have to specify how to join those tables.
- Only one-to-one and many-to-one joins are allowed, no many-to-many or one-to-many. For example store_id is considered to be the primary key of store table so a store_id from sale will only match with one record from store.
- You can only specify joins between the main table and the rest of the reference tables, no between reference tables. For example you can specify a join between sale and store, or sale and user, but not between store and user. All joins goes to the main table.
- Syntax limitations:
- Columns are specified with the table name and the column index (from 0 to N-1):
- Example: SELECT sale.0 instead of SELECT sale.user_id
- You always have to specify the table name before the index.
- Aliases are not supported
- SELECT * or SELECT table.* is not supported. You have to specify each column.
- Joins can only be specified inside FROM, not in WHERE:
- You can do: SELECT ... FROM a JOIN b ON a.x = b.y
- But you cannot do SELECT ... FROM a, b WHERE a.x = b.y
- The only supported type for columns is String (no Integer, Float, etc), this implies that:
- You can only compare against a String. For example: table.column = '51' (even if the value was a number).
- If you use a comparison operator like > or <= I will be doing a character comparison and not a numerical one.
- When sorting I'll be doing character comparisons and not numerical ones.
- No math expressions, like SELECT (column_1 * 2 + column_3) / 5
- Columns in GROUP BY must be the same than columns in SELECT (ignoring aggregate functions). These means you cannot group by some columns but then show others.
- Columns in ORDER BY must be specified with an index (from 0 to N-1). Example: ORDER BY 1 DESC
- Aggregation functions that are supported:
- COUNT, SUM, AVG, MAX, MIN
- You cannot do COUNT(*), you always have to specify the column that you want to count. Example: COUNT(sale.1)
- SQL features not implemented:
- Having is not supported... yet
- No LIMIT, TOP, IN, EXISTS, BETWEEN, Sub-selects, DISTINCT, etc.
- I haven't implemented any typical SQL function like CONCAT, LENGTH, TRIM, etc, and you cannot create your own functions.
- Tables don't have indexes, primary keys, foreign keys, triggers, constraints, etc.
As you can see... the list of restrictions is pretty large. Many of them could be removed if I had more time to write a proper SQL parser while others will simply not be possible to implement with my current architecture.
The previous query that we used for our example could be re-write for my engine in the following way:
SELECT store.1, COUNT(sale.1)
FROM sale JOIN store ON sale.2 = store.0
GROUP BY store.1
ORDER BY 1 DESC
Architecture
Tables
Tables are assumed to be stored in a similar way than Hive. Each table has a directory in HDFS, Its name should match the table name. Inside that folder there could be one or many HDFS files (their names don't matter). Each file consist of a bunch of lines were each line is a row of the table and each column can be parsed knowing the column delimiter, like a CSV file.
Hive uses something called DataStore to persist metadata about each table, for example the type of each column, size of the table, indices, etc. In my simple architecture I'm not using anything like that. I don't have any metadata for a table and that produces some of the restrictions I mentioned before:
- I don't know the type of each column so I assume everything is of type String.
- I don't know the name of each column so you have to use their indices to specify them.
- I don't know the size of a table so I don't know if it can be fitted entirely in memory. So I just assume the first table specify inside FROM is huge while the following tables are small and be cached in memory.
There is also no CREATE TABLE command. Just create an HDFS directory and move your data there (somehow similar to an external table in Hive).
Strategy
We have two Map-Reduce jobs:
- The first one is used for almost everything: joins, where, select and group by:
- Mapper: perform joins, apply Where filters and pick columns to be used in Select and Group by.
- Reducer: compute aggregation functions
- The second one is used only for sorting (if needed)
The following is a list of the main steps involved to execute a query:
 |
| Main components and interactions |
- The SQL query is parsed and validated. Main and reference tables are identified.
- The main table is set up as the input of the first job mapper.
- The reference tables are loaded in Hadoop distributed cache.
- The first job is executed:
- It reads each line and converts it to a row with columns.
- For each join I search if the join conditions are satisfied between the reference tables and the current row of the main table. If so I append all columns.
- I apply WHERE conditions to see if the current joined row satisfies them. If this test failed then I skip the current line.
- I extract the columns that I need for SELECT and output them as KEY for the Reducer (one after the other concatenated by a delimiter).
- If there is a Group By then I output the columns needed for aggregation functions as VALUES for the Reducer (one after the other concatenated by a delimiter).
- Reducer:
- If I don't require to do Group By then I just output the KEY that we received from the Mapper as the result KEY of the Reducer. I don't care about the value (we just write an empty Text).
- If there is a Group By then I proceed to loop through all the values I received in the Reducer for that grouping key. For each value I update the corresponding aggregation function. Then I output the columns we received in the KEY from the Mapper plus the results of the aggregation functions (in the proper order). As in the previous case, I don't care about the value I output.
- If I need to do ORDER BY then I execute the second job.:
- The Mapper just reads each line and outputs them using the same KEY (a fixed value). All columns are outputted as VALUES for the Reducer (one after the other concatenated by a delimiter).
- The Reducer receives all the rows (because I am using only the same key for all of them), saves all of them in memory and then performs an in-memory sorting (if you don' have enough memory it blows up).
Implementation
SQL parsing
The string representing the SQL sentence is parsed using a custom made 100% hard-coded SQL parser. There is nothing new to learn here and it's definitely not the proper way to do it if you want to cover all SQL features.
//Create config
Configuration mainConf = new Configuration();
//Parse sql
SqlParser sqlParser = new SqlParser(sql, true);
//Store original sql string as a config parameter
mainConf.set("sql", sql);
You can take a look to the parser
here.
The string is parsed and stored in internal objects that are easy to interact with. This is done in the main class where jobs are being assembled.
 |
| Class diagram for classes used to store SQL components after parsing has been performed |
These objects are also needed inside the mappers and reducers. The correct thing to do would be to serialize the internal objects (for example using a plain JSON string) and then send that to the mappers and reducers as a configuration parameter.
However instead of doing that I'm just passing the original SQL string to each mapper and reducer and then I proceed to do the parsing again (without running validations). It's just simpler to parse all again than dealing with serialization (although it's slower).
First job setup
I create a new Map-Reduce job:
//Create config
Configuration mainConf = new Configuration();
//Create job
Job mainJob = Job.getInstance(mainConf, "SqlEngine-Main");
//Set Map and Reduce
mainJob.setOutputKeyClass(Text.class);
mainJob.setOutputValueClass(Text.class);
mainJob.setJarByClass(SqlEngine.class);
mainJob.setMapperClass(SqlEngineMapper.class);
mainJob.setReducerClass(SqlEngineReducer.class);
I then specify the main table as the input of this job:
//Add main table to input
Path mainTablePath = new Path(basePath + "/" + sqlParser.mainTable);
FileInputFormat.addInputPath(mainJob, mainTablePath);
Then I add all reference tables to the distributed cache:
//Add secondary tables to distributed cache
for (SqlParser.Join join : sqlParser.joins) {
//Path to table dir
Path joinTableDir = new Path(basePath + "/" + join.table + "/");
//Check how many files we have inside that dir
FileStatus[] fileStatus = hdfs.listStatus(joinTableDir);
if(fileStatus.length == 0) {
//error, table is empty
} else if(fileStatus.length == 1) {
//Add file to distributed cache
mainJob.addCacheFile(fileStatus[0].getPath().toUri());
} else {
//There are many files, merge them all in one temp file
Path tmpMergeFile = new Path(tmpDir + "/" + sqlParser.joins + ".tmp");
Utils.mergeFiles(hdfs, joinTableDir, tmpFile);
mainJob.addCacheFile(tmpFile.toUri());
}
}
The directory for a specific table may contain more than one file. In that case I have to load them all in the distributed cache and then treat them as only one table in the Mapper. To simplify that part I'm just creating a temporary file where I merge all the files into one.
Finally the first job is executed:
//Execute
mainJob.waitForCompletion(true);
Mapper
The first Job Mapper will receive the following data as input:
- Line by line of the main table files.
- Reference tables in distributed cache.
- SQL parsed structure: tables that we need, columns, joins, etc.
The output of the Mapper is:
- Key: Text, will contain the value of the columns specified in SELECT and GROUP BY separated by a delimiter: column1,column2,colum3
- Value: Text, if we don't require to do GROUP BY then it will be an empty String. Otherwise I'll be outputting the values of the columns specified in aggregated functions used in SELECT (more on that later). The content is also separated by a delimiter: column1,column2,colum3
Join
Joins are performed in the Mapper as its first step. The idea is to see if the current row of the main table match any of the columns of the reference tables I have in memory.
So first the reference tables that I put in the distributed cache must be loaded into memory during the Mapper setup phase:
public void setup(Context context) throws IOException, InterruptedException {
//Load tables in memory
FileSystem hdfs = FileSystem.get(context.getConfiguration());
tables = new ArrayList<Cachedtable>();
if(context.getCacheFiles() != null) {
for (URI uri : context.getCacheFiles()) {
tables.add(new CachedTable(hdfs, new Path(uri), columnSeparator));
}
}
super.setup(context);
}
CacheTable is just a helper class to read a file from HDFS and store each line as a list of rows in memory:
public class CachedTable {
String table;
List<Record> rows;
public CachedTable(FileSystem hdfs, Path path, String sep) {
rows = new ArrayList<Record>();
//Obtain table name
table = path.getName();
table = table.substring(0, table.lastIndexOf('.'));
//Reader HDFS file
BufferedReader reader = null;
try {
reader = new BufferedReader(new InputStreamReader(hdfs.open(path)));
String line;
//Store each line in memory as an instance of Record
while((line = reader.readLine()) != null) {
rows.add(new Record(line, sep));
}
} catch (IOException e) {
throw new RuntimeException("Error loading table in memory: " + path, e);
} finally {
try {
if(reader != null) {
reader.close();
}
} catch (IOException e) {
}
}
}
}
Record is another helper class to split a line and stores its columns:
public class Record {
int columnsCount;
String[] columns;
public Record(String line, String sep) {
String[] split = line.split(sep);
columnsCount = split.length;
columns = new String[columnsCount];
for (int i = 0; i < split.length; i++) {
columns[i] = split[i].trim();
}
}
}
With all that I have to see if the current row satisfy its joins conditions with any row of the reference tables.
For example if I have:
FROM sale JOIN store ON sale.2 = store.0
I have to loop through the entire store table that we have in memory and test if column index 2 for any of those rows is equals to column index 0 of the current row.
 |
| Perform a JOIN between the main table and a reference table |
If there is no row in store that satisfy the join condition then the Mapper skip the current row. If a match is found I pick both rows, the current row and the one found in the reference table, and I added to a Map. All those are the columns that I am going to use to perform WHERE, SELECT and GROUP BY.
 |
| Map storing the two rows that have been joined |
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
//Current row of the main table
Record currentRow = new Record(value.toString(), columnSeparator);
//Joins: search in cached tables and merge records into one
Map<String, Record> joinedRows = new HashMap<String, Record>();
joinedRows.put(sql.mainTable, currentRow);
for (Join join : sql.joins) {
CachedTable joinTable = getTable(join.table);
for (JoinClause joinClause : join.joinClauses) {
String currentRowValue = currentRow.getValue(joinClause.localColumn);
//Perform a full table search by that column
Record joinRow = joinTable.searchByColumn(joinClause.foreignColumn.column, currentRowValue);
//Join not satisfied, skip the current row
if(joinRow == null) {
return;
}
joinedRows.put(join.table, joinRow);
}
}
}
public class CachedTable {
//Full table scan
public Record searchByColumn(int column, String value) {
for (Record row : rows) {
if(row.getValue(column).equals(value))
return row;
}
return null;
}
}
Where
Where filters are also applied in the Mapper. They are done after joins. The Map with joined records is used to pick up the columns that we need to test in WHERE conditions.
There is nothing special in this. I analyse each condition if they are not satisfied we skip the current row.
These tests are performed recursively. I have a two kinds of WHERE conditions:
- Simple condition: =, !=, >, >=, <, <= and LIKE.
- Compose condition: made of two conditions (simple or composed) and a conditional operator (AND, OR)
It's similar to Composite design pattern.
We also perform early-out if the first condition in a compose condition is not satisfied and we have an AND operator.
//Where: apply filters
if(sql.whereClause != null) {
//Apply nested filters recursively
boolean result = applyWhereClauseRecursive(joinedRows, sql.whereClause);
//Skip current line if the filters were not satisfied
if(!result) {
return;
}
}
private boolean applyWhereClauseRecursive(Map<String, Record> joinedRows, WhereClause clause) {
//Compound expression
if(clause instanceof WhereCompoundClause) {
WhereCompoundClause compClause = (WhereCompoundClause)clause;
//Apply left clause first
boolean leftResult = applyWhereClauseRecursive(joinedRows, compClause.leftClause);
//Early abort if using an AND operator
if(!leftResult && compClause.operator == ConditionalOperator.AND)
return false;
//Apply right clause
boolean rightResult = applyWhereClauseRecursive(joinedRows, compClause.rightClause);
//Apply conditional operator
boolean result;
if(compClause.operator == ConditionalOperator.AND) {
result = leftResult && rightResult;
} else {
result = leftResult || rightResult;
}
return result;
//Final expression
} else {
//Get value to compare
WhereSimpleClause simpleClause = (WhereSimpleClause)clause;
String colValue = joinedRows.get(simpleClause.column.table).getValue(simpleClause.column.column);
//Apply operator
boolean result = false;
switch (simpleClause.operator) {
case EQUALS:
result = colValue.toLowerCase().equals(simpleClause.value);
break;
case NOT_EQUALS:
result = !colValue.equals(simpleClause.value);
break;
case LIKE:
result = colValue.toLowerCase().contains(simpleClause.value);
break;
case GREATER:
result = colValue.compareTo(simpleClause.value) > 0;
break;
case GREATER_EQ:
result = colValue.compareTo(simpleClause.value) >= 0;
break;
case LOWER:
result = colValue.compareTo(simpleClause.value) < 0;
break;
case LOWER_EQ:
result = colValue.compareTo(simpleClause.value) <= 0;
break;
}
return result;
}
}
Select
SELECT is performed in the Mapper after WHERE.
I loop through the columns that have been specified in the SELECT part of the SQL sentence and I pick the value of those columns from the Map with joined records.
I output those values as the Reducer KEY, separated by a delimiter.
//Select: leave only the columns that we want to see. Put them in reducer key
List<String> keyColumns = new ArrayList<String>();
for (SelectColumn selectColumn : sql.selectColumns) {
//Constant: just add the value
if(selectColumn.type == SelectColumnType.CONSTANT) {
keyColumns.add(selectColumn.constant);
//Regular column: add column value
} else if(selectColumn.type == SelectColumnType.COLUMN) {
String colValue = joinedRows.get(selectColumn.column.table).getValue(selectColumn.column.column);
keyColumns.add(colValue);
}
}
Group by
I put the restriction that Group By columns should match Select columns (except for aggregation functions) so grouping has already been solved in the previous step. All values with the same key will go to the same Reducer and that way I can compute there the aggregation functions.
The only thing that remains to do in the Mapper is to output the values that we need in the Reducer for the aggregate functions. I output those columns in the VALUE for the Reducer, separated by a delimiter:
//Aggregation columns: put them in reducer value
List<String> valueColumns = new ArrayList<String>();
for (SelectColumn selectColumn : sql.selectColumns) {
if(selectColumn.type == SelectColumnType.AGGREGATE) {
String colValue = joinedRows.get(selectColumn.column.table).getValue(selectColumn.column.column);
valueColumns.add(colValue);
}
}
Reducer
The Reducer receives the following as input:
- Key: the Group By or Select columns I outputted in the Mapper
- Values: the value of each column that we need to use compute aggregation functions.
The output will be:
- Key: all the columns that the user wants to see (separated by a delimiter)
- Value: an empty String (I don't care about the value)
If the SQL sentence doesn't require Group By then I can just output what I received as the key. Otherwise I have to compute aggregation functions. I loop through each aggregation function specified in SELECT and update the value of the required calculation:
//Parse grouping columns from key
Record groupColumns = new Record(key.toString(), columnSeparator);
//Init all aggregate values
int count = 0;
double sum = 0;
double max = Double.MIN_VALUE;
double min = Double.MAX_VALUE;
//Loop through each value that we have to aggregate
for (Text aggregateItem : values) {
Record aggregateColums = new Record(aggregateItem.toString(), columnSeparator);
//Loop trough aggregate columns
int aggIndex = 0;
for (SelectColumn c : sql.selectColumns) {
if(c.type == SelectColumnType.AGGREGATE) {
String colValue = aggregateColums.getValue(aggIndex);
//Count
if(c.function == AggregateFunction.COUNT || c.function == AggregateFunction.AVG) {
count++;
}
//Sum
if(c.function == AggregateFunction.SUM || c.function == AggregateFunction.AVG) {
double v = Double.parseDouble(colValue);
sum += v;
}
//Max and Min
if(c.function == AggregateFunction.MAX || c.function == AggregateFunction.MIN) {
double v = Double.parseDouble(colValue);
if(v > max) max = v;
if(v < min) min = v;
}
aggIndex++;
}
}
}
The last step is to output the columns that the user wants to see, including results from aggregation functions, and in the proper order:
//Generate final list of columns
List<String> resultColumns = new ArrayList<String>(sql.selectColumns.size());
int groupColIndex = 0;
for (SelectColumn c : sql.selectColumns) {
//Add constant or column value
if(c.type == SelectColumnType.CONSTANT || c.type == SelectColumnType.COLUMN) {
resultColumns.add(groupColumns.getValue(groupColIndex));
groupColIndex++;
//Add aggregate calculation
} else if(c.type == SelectColumnType.AGGREGATE) {
switch (c.function) {
case COUNT:
resultColumns.add(String.valueOf(count));
break;
case SUM:
resultColumns.add(Utils.printDouble(sum));
break;
case MAX:
resultColumns.add(Utils.printDouble(max));
break;
case MIN:
resultColumns.add(Utils.printDouble(min));
break;
case AVG:
double avg = sum / (double)count;
resultColumns.add(Utils.printDouble(avg));
break;
}
}
}
outputWritable.set(getColumsOutput(resultColumns, columnSeparator));
Order by
To perform Order By I use a second job. This job reads the output generated by the previous job and proceed to do sorting.
The Mapper doesn't do anything important (just an identity mapper). The Reducer receives all rows and perform an in-memory sorting:
public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//Load all rows in memory
List<Record> rows = new ArrayList<Record>();
for (Text text : values) {
rows.add(new Record(text.toString(), columnSeparator));
}
//Sort rows based on order by columns
Collections.sort(rows, new Comparator<Record>() {
@Override
public int compare(Record a, Record b) {
int result = 0;
for (SortClause sortClause : sql.sortClauses) {
String value1 = a.getValue(sortClause.index);
String value2 = b.getValue(sortClause.index);
result = value1.compareTo(value2);
if(result != 0) {
result = sortClause.order == SortOrder.ASC ? result : -result;
return result;
}
}
return result;
}
});
//Output sorted rows
for (Record row : rows) {
outputWritable.set(row.toString());
context.write(outputWritable, empty);
}
}
It must be noted that in order for the previous solution to work properly only one Reducer must be used. That must be specified when creating the job:
sortJob.setNumReduceTasks(1);
This solution will of course not work if the number of rows to sort cannot be fitted into memory. A better approach would be to use a custom partitioner and divide the load into chunks.
Conclusion
The full source code can be downloaded from here:
DOWNLOAD
There is a still a lot to improve in the current implementation. More SQL features need to be implemented in order to be a general solution useful for many situations.
The restriction of having to load all the join tables in memory is too strong for many cases. Alternative join mechanism should be provided.
Also Order By should be re-implemented to support arbitrary large results.
However the insight learnt doing my own implementation was very useful for me to understand how to performance tunning on existing solutions, like Hive, Pig and Impala.
References