Denodo Query Optimizations for the Logical Data Warehouse (Part 2): Working With Partitioned Fact Tables

Applies to: Denodo 7.0 , Denodo 6.0
Last modified on: 24 May 2018
Tags: Best practices Cost optimization Data movement Optimization Performance

Download document

You can translate the document:

Overview

This document is aimed at Data Architects and Developers interested in the new optimization techniques that Denodo applies for scenarios that require integrating data sources containing large volumes of data. This is the case, among others, of Logical Data Warehouse and Logical Data Lake scenarios, where the data relevant for analytic purposes is distributed across several different data sources. In these scenarios, it is common to partition fact tables across several systems according to some criteria. This document focuses on how to use the Denodo Platform to deal with these scenarios. 

This document includes:

  • A description of the optimization techniques that Denodo applies when working with partitioned fact tables.
  • A step-by-step tutorial on how to configure Denodo to automatically apply these optimizations.

Introduction

This is the second document in a series about the query optimizations that Denodo applies in the context of a Logical Data Warehouse / Data Lake scenario. Read the first one to get more information about these optimizations and some configuration steps that are going to be referenced from this document. Moreover, this document is focused on the optimizations applied when working with partitioned fact tables.

Logical Data Warehouse Scenario

Let’s consider the example scenario of a big retailer which sells products to customers. The customer's data is stored in a conventional relational database. For each customer, the database stores its ‘identifier’, ‘name’, ‘surname’, ‘address’, etc. The retailer also has a Data Warehouse (DW) with sales information. For each sale, the Warehouse stores the ‘identifier’ of the sold product, the ‘sale’ date and the ‘sale price’. The Data Warehouse stores only sales from the last year. Sales data from previous years is periodically offloaded to a Hadoop-based repository running Impala (or a similar system). The fields stored in Impala for each sale are the same as in the Data Warehouse. You can see a schema of this architecture in Figure 1.

This pattern is commonly known as ‘Data Warehouse Historical Offloading’ (or ‘Cold Data Storage’), where only the most current data (e.g. last year) is in the Enterprise Data Warehouse (EDW) and the historical data is offloaded to a Hadoop cluster. Some of its characteristics are the following:

  • Cold data (that is infrequently used) is copied to a Hadoop store (cheaper solution).
  • For consuming applications, the hybrid data warehouse appears as a single data store.
  • In Denodo, this situation can be modelled by creating a “virtual fact table” using a UNION view with selections based on the partitioning conditions (e.g. date). We will see examples below.
  • Typical queries join the “virtual fact table” with one or several dimensions and perform and aggregation on top.
  • Queries on current data only need to go to the DW, but data from more historical time spans need to merge with Hadoop.
  • The Data Virtualization engine applies query partitioning techniques (branch pruning) to ignore unnecessary query branches that are removed for better performance.

Figure 1 Data Warehouse Historical Offloading

Tutorial

This section provides a step-by-step tutorial on how to set up a sample Logical Data Warehouse scenario and how to appropriately configure Denodo for it. The tutorial is divided into the following sections:

  1. Preparing the data sets: get the data sets and load them into the different data sources used for the example.
  2. Preparing Denodo: implement the data model in Denodo and configure the views to ensure they can use the relevant optimizations.
  3. Executing the queries: execute several queries to illustrate several scenarios and the optimizations applied for each one.

Preparing the Data Sets

As in the previous document, we are going to use data sets from the standard industry benchmark TPC-DS. In this case, we are going to partition the fact table store_sales into two systems:

  • Netezza: containing the sales from the current year.
  • Impala: containing the sales from previous years.

preparing_dataset.png

Figure 2 Our star-schema model

In addition, as we are going to partition the store_sales view according to the sales’ date, we are also going to need the date_dim dimension replicated in both data sources.

The dimensions customer and item are loaded in a different database (e.g. Oracle).

Preparing Denodo

To implement the model depicted on Figure 2 in Denodo, we are going to create the partitioned union for the store_sales view. Follow these steps:

  1. Create the data sources and base views for store_sales in Impala and Netezza.

Figure 3 Data sources and base views for store_sales

  1. Create the base views for the dimension date_dim both in Impala and Netezza. For the sake of simplicity, for each of the two data sources, we are joining the store_sales view with the dimension table date_dim (see Figure 4), in order to have the year (as a number) of the sale to make the partitioning of the facts table easier. In the Output tab of the JOIN wizard, we will select the field d_year from the date_dim view and all the fields from the store_sales view.

Figure 4 Joining store_sales with date_dim

  1. Create a selection view for each of the views created on step 2. We will use the following conditions for each data source (see Figure 5, Figure 6 and Figure 7):
  1. Netezza: create view store_sales_new with condition year >= current_year
  2. Impala: create view store_sales_old with condition year < current_year

This way, all queries involving sales only from the current year will hit Netezza, while the queries involving sales before the current_year will hit Impala.

Figure 5 store_sales_old condition in Impala

Figure 6 store_sales_new condition in Netezza

Figure 7 Tree view of store_sales_old and store_sales_new

  1. Create a UNION view from the views created in the previous step, to get all the sales. We are naming this view as store_sales_all. This is the final sales view we will use in our queries.

Figure 8 Tree view of store_sales_all

Furthermore, Denodo uses a variety of information for query optimization, including statistics of the views’ data, information about the available indexes in the data sources, primary keys, and the referential constraints defined in Denodo. It also takes into account some configuration parameters describing certain features of the data sources.

In order for Denodo to apply these optimizations, the following steps must be performed, as explained in detail in the previous article in this series.

  1. Declaring Primary Keys
  2. Collecting Statistics
  3. Declaring Data Movement Targets
  4. Declaring Indexes in Denodo

Executing Queries

Now let’s see how to execute some queries in the sample scenario and how the optimizations work in practice.

Total Sales by Customer

Let us start with a query that applies an optimization technique (‘aggregation pushdown’) already described in the first article of these series. The query computes the total sales by customer (see Figure 9). In this example, the Denodo VDP Optimizer follows these steps:

  1. It delegates the GROUP BY operation to each partition of the STORE_SALES (i.e., store_sales_old and store_sales_new).
  2. It does the UNION and then JOINs the results with the tuples from the customer view to add the rest of projected fields (see Figure 11).
  3. It applies the GROUP BY operation to group the sales coming from each branch (old and new).

SELECT

    c_customer_sk,

    c_first_name,

    c_last_name,

    c_preferred_cust_flag,

    c_birth_country,

    c_login,

    c_email_address,

    SUM(ss_ext_sales_price) year_total,

    's' sale_type

FROM customer

    INNER JOIN store_sales_all

    ON (c_customer_sk = ss_customer_sk)

GROUP BY

    c_customer_sk,

    c_first_name,

    c_last_name,

    c_preferred_cust_flag,

    c_birth_country,

    c_login,

    c_email_address

Figure 9 Raw SQL query for total_sales_by_customer view

Figure 10 Tree view for total_sales_by_customer view

Figure 11 Query Plan for total_sales_by_customer view

In the image above, the left branch of the JOIN gets the tuples from the store_sales_all view. Notice how the GROUP BY operation is completely pushed down to each partition. The right branch of the JOIN retrieves the customers. Also notice how the execution plan shows that the “Aggregation Push-down” optimization has been applied.

Let’s analyze the number of tuples that are transferred through the network with and without this optimization. We have the following number of rows for each view:

  • Facts table: store_sales_all (287,997,024 rows).
  • New: store_sales_new (598,678 rows).
  • Old: store_sales_old (287,398,346 rows).
  • Dimension tables: customer (2,000,000 rows).

Without this optimization, every tuple from each view involved in the query is transferred to Denodo. Therefore, 287,398,346 + 598,678 + 2,000,000 = 289,997,024 rows are moved through the network to be processed by Denodo to get the final result. However, by using the aggregation pushdown optimization only 1,999,984 + 51,590 + 2,000,000 = 4,051,574 tuples (only a 1.4% of the total) are transferred, as shown in Figure 12. The impact on performance is huge, not only because far fewer rows are transferred through the network but also because Denodo only has to process a much smaller subset of the original data set. Each data source performs most of the work locally on its own data. In the end, Denodo only needs to combine the partial results from each data source to generate the final results.

Figure 12 Tuples transferred for total_sales_by_customer view

Last Year’s Sales by Customer

In this scenario (see the SQL query in Figure 13), the query optimizer detects that only the tuples from the EDW (the one that contains the sales from the last year) are needed (because of the filtering in the where clause). Therefore, the branch for the historical sales (Impala) is not needed. This enhancement is known as ‘branch pruning’. In the same way, if we were asking only for the sales from previous years, data from Netezza (current year sales) would not be needed.

Figure 14 shows that the optimization applied was ‘Branch Pruning’ and that all the data for store_sales_all is coming from store_sales_new (Netezza). Due to the filtering condition, no data is needed from store_sales_old.

SELECT

    c_customer_sk,

    c_first_name,

    c_last_name,

    c_preferred_cust_flag,

    c_birth_country,

    c_login,

    c_email_address,

    SUM(ss_ext_sales_price) year_total,

    's' sale_type

FROM customer

    INNER JOIN store_sales_all

    ON (c_customer_sk = ss_customer_sk)

WHERE d_year = getYear(now())

GROUP BY

    c_customer_sk,

    c_first_name,

    c_last_name,

    c_preferred_cust_flag,

    c_birth_country,

    c_login,

    c_email_address

Figure 13 Raw SQL query for last_year_sales_by_customer view

Figure 14 Query Plan for last_year_sales_by_customer view: only data from Netezza

Total Sales by Customer Filtered by Country: Data Movement with Partitioning

Another optimization technique available in Denodo is on the fly Data Movement. When we need to combine a large data set in one data source with small data set(s) from other data source(s), it may be worthwhile to copy the small data sets to temporary tables in the data source containing the large data set, and then pushing down the combination operation to it. See the previous document for more details and examples.

The data movement optimization can also work with partitioned fact tables: in this case, when a query combines one or several small data sets with a large partitioned fact table, Denodo’s optimizer might decide to copy the small data set(s) to all the data sources containing partitions of the fact table. This decision depends on the cost of each possible execution plan, which takes into account information about statistics, indexes, etc.

In this scenario (see the SQL query in Figure 15), the Denodo optimizer chooses to move the data from the item  dimension to both Netezza and Impala. This way, the JOIN and GROUP BY operations can be pushed down below the UNION, as shown in Figure 16.

SELECT

    i_item_sk,

    SUM(ss_sales_price) sum_agg

FROM store_sales_all

    JOIN item

    ON (store_sales_all.ss_item_sk = item.i_item_sk)

WHERE i_current_price > ss_list_price

  AND i_category = 'Women'

GROUP BY item.i_item_sk

Figure 15 Raw SQL query for total_sales_by_item_price_women view

Figure 16 Data Movement for total_sales_by_item_price_women view

The following images show how data from the item dimension is copied to each partition to favor query delegation to each system.

Figure 17 Execution plan for total_sales_by_item_price_women view

As you can see in the image above, data from item is copied into a temporary table in Netezza. This way, the query to retrieve the last year’s sales by customer (filtered by country) can be fully delegated to Netezza.

Figure 18 Execution plan for total_sales_by_item_price_women view

Alongside what is happening in the previous image, data from item is also being copied into a temporary table in Impala. This way, the query to retrieve the previous years’ sales by customer (filtered by country) can also be fully delegated to Impala.

Finally, Denodo receives a data set from each branch that and processes the GROUP BY condition to give the final results.

Total Sales by Customer: Alternative Sources

When working with logical star schemas where the fact tables are partitioned in several data sources, sometimes some of the dimension tables are replicated in all the data sources containing partitions of the fact table. Denodo can be configured to leverage this in order to maximize query pushdown.

To illustrate this functionality, let’s consider a new scenario where the data from customers_db (contains the master data) is periodically copied to each data source of the partitioned facts table (Netezza and Hadoop). To test this scenario, you will need to do the same: take the customer data set and load it in the two data sources as explained in the previous article. 

Now we want to compute the total sales by customer, as shown in the query from Figure 19.

SELECT

    c_customer_sk,

    c_first_name,

    c_last_name,

    c_preferred_cust_flag,

    c_birth_country,

    c_login,

    c_email_address,

    SUM(ss_ext_sales_price) year_total,

    's' sale_type

FROM customer

    INNER JOIN store_sales_all

    ON (c_customer_sk = ss_customer_sk)

GROUP BY

    c_customer_sk,

    c_first_name,

    c_last_name,

    c_preferred_cust_flag,

    c_birth_country,

    c_login,

    c_email_address

Figure 19 Raw SQL query for total_sales_by_customer view

The new scenario is depicted in Figure 20, where you can see that the customers dimension is replicated both in Netezza and the Hadoop cluster (we no longer need the source customers_db).

Figure 20 Customer’s data replicated in the data source of each partition of the facts table

In Denodo, you can configure a base view to indicate that its data is replicated in different data sources, and it will be able to use this information for optimization purposes. At runtime, when this base view is involved in a query, the optimizer will select the source that maximizes the number of operations that can be pushed down for that particular query.

In order for the optimizer to apply this optimization, you have to define the alternative sources of the base views. Let’s see how this can be done:

First, let’s create the base view customer from the customer table in Netezza in the usual way. As explained before, this data has also been replicated in Impala, so we can create an alternative source for this base view with the following steps:

  1. Open the new Netezza-source customer base view
  2. Go to the Options Tab
  3. Click on Search methods
  4. Click the Green + next to ‘Alternative wrappers’
  5. Choose the Virtual Database and Impala data source that contains the replicated customer table from the dropdowns next to ‘JDBC data source database’ and ‘JDBC data source’ respectively
  6. Check the box next to customer table from the introspection tree
  7. Click Create selected > Ok

You can see the process in the following images (from Figure 21 to Figure 24):

Figure 21 Adding alternative wrappers to a view (a)

Figure 22 Adding alternative wrappers to a view (b).

Figure 23 Adding alternative wrappers to a view (c)

Figure 24 Adding alternative wrappers to a view (d)

As you can see in the following Figure 25 and Figure 26, the JOIN operation can now be pushed down below the UNION view and each branch of the union is entirely pushed down to the sources too. If we had not configured the alternative wrapper, only one branch could be pushed down (Denodo would only consider the customers’ data only in Netezza or Impala, not both).

Figure 25 Query Plan for last_year_sales_by_customer view when using alternative wrappers for customer (a)

Figure 26 Query Plan for last_year_sales_by_customer view when using alternative wrappers for customer (b)

The number of tuples transferred through the network in this optimized scenarior to Denodo is 1,999,984 (historical sales) + 51,590 (current year’s sales) = 2,051,574. Finally, Denodo applies the GROUP BY operation to give as result the 1,999,984 rows; it is the same result for the same query as shown in Figure 12, but with less tuples transferred thanks to defining the alternative wrappers.

References

Denodo Query Optimizations for the Logical Data Warehouse

Questions

Ask a question
You must sign in to ask a question. If you do not have an account, you can register here

Featured content

DENODO TRAINING

Ready for more? Great! We offer a comprehensive set of training courses, taught by our technical instructors in small, private groups for getting a full, in-depth guided training in the usage of the Denodo Platform. Check out our training courses.

Training