Hive JDBC Storage Handler

Qubit enables our clients to unlock the power of their data - across silos and locations, in low latency at an immense scale. Early iterations of our platform extensively leveraged the power of the Apache Hive project. Whilst we have since mostly moved on from this to newer platforms, we’d like to contribute back to the community the work we’ve done as we think this could be useful to others.

One of the requirements we came across when working with complicated Hive pipelines was the need to join up several disparate data sources. Most of these data sources were huge terabyte-sized collections of files stored in HDFS but we also had small and highly dynamic collections of records stored in MySQL.  Whilst it would have been fairly easy to periodically dump the necessary MySQL tables into HDFS using a tool like Sqoop, we did not want to use that approach for several reasons: 

  1. Source data was highly dynamic. Therefore, we would need to schedule the ETL process at a fairly high frequency to avoid using stale data

  2. ETL processes, no matter how trivial, add monitoring and management overhead for operations teams

  3. Storing small files in HDFS is an anti-pattern

One way to address the aforementioned problems is to allow Hadoop to directly read from a JDBC data source as if it was local. Since most of our data was already in a Hive data warehouse, we specifically wanted Hive to be able to treat a JDBC data source as an external table. Given the prevalence of SQL databases in the enterprise we expected this functionality to already exist but surprisingly that turned out to be an inaccurate assumption.

The official Hive issue HIVE-1555 to create a JDBC storage handler is still open and has gone unresolved for four years at the time of writing. The only other reference to a Hive JDBC Storage Handler (HJSH) we found at the time was embedded deeply inside the WSO2 Business Activity Monitor bundle. Attempts to find the source (a tarball over 2GB in size, not maintained in a public source code repository), isolate the implementation from proprietary dependencies and make it work with our standalone version of Hive turned out to be a lot more difficult than we anticipated. Furthermore, we did not want to rely on what was clearly a helper utility built into an enterprise product -- resulting in an update and bug fix schedule that was likely to be dictated by the velocity of development of the main product.

So we decided to develop our own version of a JDBC storage handler for Hive, which we have used internally for more than two years. It has proven to be an invaluable tool for Data Scientists and Engineers alike, used in ways we never would have anticipated when we started the project. Today we are happy to release it publicly as an open source project.

Qubit HJSH

Qubit HJSH can be used to create an external table as follows:

CREATE EXTERNAL TABLE part_ext
(
  id STRING,
  part_name STRING,
  warehouse_id STRING,                
  created STRING
)
STORED BY 'com.qubitproducts.hive.storage.jdbc.JdbcStorageHandler' 
TBLPROPERTIES (
  "qubit.sql.database.type" = "MySQL",
  "qubit.sql.jdbc.url" = "jdbc:mysql://localhost:3306/qubit?user=qubit&password=qubit",
  "qubit.sql.jdbc.driver" = "com.mysql.jdbc.Driver",
  "qubit.sql.query" = "SELECT part_id,part_name,warehouse_id,created_datetime FROM parts",
  "qubit.sql.column.mapping" = "id=part_id, created=created_datetime:date"
);

As MySQL and most other relational databases have a richer data type system than Hive, HJSH provides support for mapping data types between different systems. Currently this feature is only implemented for date types. However, it can be easily extended to support other special data types as well.

The distinguishing feature of Qubit HJSH is the support for the filter pushdown feature of HiveQL. This feature allows the source query to be modified at runtime to significantly reduce the amount of data transferred from the source database and make better use of indexes and other database optimisations already present at the source. To illustrate; given the above external table definition, a hive query of the form

SELECT id FROM part_ext WHERE created > ‘2014-01-01’

will be converted to the MySQL query

SELECT part_id FROM parts WHERE created_datetime >
str_to_date(‘2014-01-01’,’%Y-%m-%d’)

Limit clauses are natively supported as well. For example:

SELECT * FROM part_ext LIMIT 10

will be converted to the MySQL query

SELECT * FROM parts LIMIT 10

Using filter pushdowns, Hive queries can run much faster because the filtering is off-loaded to the source database, which is significantly faster than Hive. There is less data to be transferred over the network -- reducing roundtrip latency and network chatter.

HJSH uses DBCP connection pooling to ensure that heavy Hive queries do not inundate the MySQL server with multiple connections. Furthermore, JDBC fetch size can be defined at runtime to tune the network and memory overhead of fetching large volumes of data.

HJSH implementation has been tested with Hive version 10 in the Cloudera CDH 4.3.0 release and with MySQL connector version 5.1.21.

We hope the community finds this release useful. Full source code is available at https://github.com/QubitProducts/hive-jdbc-storage-handler. Contributions are very welcome.