This document is aimed at Data Architects and Developers interested in the optimization techniques that Denodo applies for scenarios that require integrating data sources containing large volumes of data. This is the case, among others, of the Logical Data Warehouse and Logical Data Lake scenarios, where the data relevant for analytic purposes is distributed across several different data sources. This document focuses on how the new optimizations added in Denodo 7.0 can be used to deal with these scenarios.
In this document we will follow a step-by-step tutorial on how to prepare a dataset and configure the environment to test the new Denodo optimizations.
Let’s consider the example scenario of a big retailer which sells products to customers. The customer's data is stored in a conventional relational database. For each customer, the database stores its ‘identifier’, ‘name’, ‘surname’, ‘address’, etc.
The retailer also has a Data Warehouse (DW) with sales information. For each sale, the Warehouse stores the ‘identifier’ of the sold product, the ‘sale’ date and the ‘sale price’. The Data Warehouse stores only sales from the last year. Sales data from previous years is periodically offloaded to a Hadoop-based repository running Impala (or a similar system). The fields stored in Impala for each sale are the same as in the Data Warehouse.
To simplify the configuration of the environment for the test, we can use a Impala cluster instead of the three systems mentioned. The datasets will be created on that cluster, in a real world scenario they would be distributed as described above. Take into account that even with this approach, the configuration in Denodo will be done as if they were three separate systems. This could affect the performance of the tests, which would be better in a real scenario with the load distributed between systems.
For this example, the datasets from TPC-DS will be used, TPC-DS is the most important standard industry benchmark for decision support systems. The TPC-DS schema models the sales and sales returns processes of an organization that employs three primary sales channels: store, catalogs and the Internet. The schema includes 7 facts tables and 17 dimension tables, but in this tutorial, only the following ones will be used:
Facts table: store_sales (2,879,987,999 rows).
Dimension table: customer (12,000,000 rows).
Dimension table: date_dim (73,000 rows).
NOTE: TPC-DS can be scaled to different data sizes. The number of rows shown assume TPC-DS has been scaled to a total size of 1 Tbyte.
After registering in order to download the TPC-DS data generation tool, called DSGen, the datasets can be generated by following the instructions from the How_To_Guide.doc file (under the /tools folder of the TPC-DS distribution).
Once the tables have been created, we are going to partition the sales table in two new tables: recent sales and previous years sales. To do that, we can execute this command in Impala to create the recent sales table, assuming that the most recent sales are from the year 2003:
CREATE TABLE store_sales_current STORED AS PARQUET AS
SELECT * FROM store_sales WHERE ss_sold_date_sk IN (SELECT d_date_sk FROM date_dim WHERE d_year = 2003)
And this command to create the previous years sales table:
CREATE TABLE store_sales_old STORED AS PARQUET AS
SELECT * FROM store_sales WHERE ss_sold_date_sk IN (SELECT d_date_sk FROM date_dim WHERE d_year < 2003)
In this section we will explain how to install and configure the bulk load mechanisms required for having a fast communication between the MPP system and Denodo.
Unzip the downloaded file. We will reference the folder as <HADOOP_HOME>.
Set the following environment variables in the system:
Note: If there are different Hadoop-based systems in the same server that require different user names, instead of using an environment variable, the user name can be set in a script which would be invoked from the data source configuration in Denodo.
Take into account that the Hadoop user name specified must have read/write privileges in the HDFS folder that will be used to upload files.
Make sure that the Java version in the system is valid to execute the Hadoop client scripts. The scripts use the JRE referenced in the JAVA_HOME environment variable. Due to a limitation in the Hadoop.cmd configuration, you need to specify a JAVA_HOME that doesn’t have spaces in its path
Note: From version 2.7.0 Java 7 or higher is required to execute Hadoop client.
The following subsections we will explain the specific details to configure the MPP with the different systems supported in Denodo:
For Impala systems, the default Hadoop username that should be set in the HADOOP_USER_NAME property is “impala”.
Denodo does not distribute the Impala JDBC driver. It can be downloaded from here: https://www.cloudera.com/downloads/connectors/impala/jdbc.html
Note: To avoid certain known issues with older drivers, the Impala JDBC driver should be 2.5.31 or newer.
Unzip the 4.1 version driver and place all the libraries in the folder <DENODO_HOME>/lib-external/jdbc-drivers/impala-2.3:
When using a Spark system as MPP, Denodo uploads the query data to the HDFS metastore and then issues a query through the SparkSQL engine using the JDBC interface provided by the Spark Thrift server. The query results are collected in the Thrift server before sending them back to Denodo.
The default behavior of the Thrift server is to gather all the results of the query before returning them, this can lead to memory issues and long response times with queries that return a large number of rows. To avoid this issues, the following property can be added to the Spark Thrift server configuration (spark-defaults.conf file):
The default user name for the Hadoop script when connecting to a Spark system is ‘hive’. The Spark JDBC driver is distributed in the Denodo installation.
The default user name for the Hadoop script when connecting to a Presto system is ‘hive’. Denodo distributes the Presto JDBC driver.
When using a production-ready cluster with Presto, no additional configuration is needed. If testing with a Presto VM, it is usually required to enable external access to the HDFS system. To do so, edit the file /etc/hadoop/conf/core-site.xml and modify the value of the property fs.defaultFS to use 0.0.0.0.
this will allow connections from external clients like Denodo.
To implement the scenario described above in Denodo, we are going to create the partitioned union for the store_sales view. Follow these steps:
To test the pushdown of operations for parallel processing we will use a query that computes the total sales by customer, including sales from the current year and also previous years:
c_customer_sk AS c_customer_sk,
sum(store_sales.ss_list_price) AS total_sales
customer AS customer
store_sales AS store_sales
ON customer.c_customer_sk = store_sales.ss_customer_sk
A naïve plan to execute this query would be to access to the three systems (customer database, current sales database, historic sales database), perform the join between customer and the total sales, and then group by customer:
This plan is very inefficient because it has to transfer through the network the complete sales information.
The VDP optimizer will optimize the query to perform a full aggregation push down to both branches of the sales databases:
With this optimization, there are approximately 36 million rows transferred through the network, the 12 millions customers and 12 millions each from each sales database. But VDP would have to perform a join with 12 and 24 million rows in each branch respectively, and then aggregate the 24 million rows resulting from the join.
This is a much better approach, but the join and group by operations still have to process a high volume of data, which will require the use of swapping to disk to avoid excessive memory consumption.
It is possible to improve the execution of this query by leveraging the use of Massive Parallel Processing (MPP). To be able to push the processing of operations to cache, it must be configured using one of the supported cache adapters for parallel processing.
First, the MPP acceleration must be activated at server level. This can be done in the Queries optimization section of the server configuration dialog. It is possible to select the cache data source or a specific JDBC data source. The selected source must be one of the supported engines for MPP acceleration.
For our tests, select the option “Use cache data source”:
Now, we have to enable and configure the cache adapter. In the connection settings, we will use one of the adapters with support for MPP query acceleration.
In the Read & Write section of the cache configuration dialog, configure the write settings, specifying the path to the Hadoop executable and the HDFS URI.
For reference, these are some typical HDFS URI used in the supported systems. Check with the systems administrator which should be used in a production scenario:
Also, enable the MPP acceleration in the query optimization settings:
IMPORTANT NOTE: To obtain the best performance, it is critical to to ensure a fast data transfer speed between VDP and the MPP system. To achieve that, they should be in the same network segment. It is also necessary to enable and configure the database bulk load mechanisms.
If the same query is executed with the MPP enabled, the optimizer will probably choose to use the full aggregation pushdown optimization to aggregate the sales information in each database, and then send the results of the aggregation and the customer information to the MPP cluster. Then the whole query will be executed in the cluster taking advantage of the parallelism. The execution plan in Denodo will look like this:
In this section we describe some of the most common issues that might happen while testing MPP query acceleration.
The query plan indicates that the query will use MPP, but the actual query does not use it.
This normally occurs because there is some problem with the uploading of the files to HDFS. It is possible that some error happened when executing the Hadoop client. It is also possible that the user specified in the Hadoop script does not have privileges to upload files to the HDFS URI specified.
These type of errors will appear in the VDP server log file. For example, a permissions error:
com.denodo.vdb.util.tablemanagement.sql.insertion.HdfsInsertWorker  - ERROR >mkdir: Permission denied: user=hduser, access=WRITE, inode="/user/hive":hive:hive:drwxrwxr-t
Or if the path to the Hadoop executable is not correctly configured:
com.denodo.vdb.util.tablemanagement.sql.insertion.HdfsInsertWorker  - Error uploading file
com.denodo.vdb.util.tablemanagement.TableManagerException: java.io.IOException: Cannot run program "C:\hadoop-2.8.2\bin\hadoop.cmd": CreateProcess error=2, The system cannot find the file specified
A query using the MPP does not return rows
This could happen if the files are uploaded to HDFS using the credentials of a user and the MPP engine is executed with a different user that does not have permissions to access those files. To avoid this issue, the files should be uploaded with the appropriate user name, configured in the Hadoop script.
The query optimizer does not choose to use the MPP query accelerator
The MPP query acceleration will be used only when other optimization techniques are not possible. The query optimizer will try to use techniques such as aggregation pushdown or data movement to join leafs, that might push most of the processing to the data sources. That would make that the volume of data that has to be processed in Denodo is low, which would not require MPP acceleration.
In other cases, if the query requires significant processing resources in Denodo, and MPP is not being used check the following: