Denodo Dialect for Spark SQL - User Manual

Download original document


You can translate the document:

Introduction

Spark SQL is a Spark module for structured data processing. A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.

Spark SQL includes a data source that can read data from other databases using JDBC. This functionality should be preferred over using JdbcRDD. This is because the results are returned as a DataFrame and they can easily be processed in Spark SQL or joined with other data sources. The JDBC data source is also easier to use from Java or Python as it does not require the user to provide a ClassTag.

This JDBC datasource includes several dialects (PostgreSQL, Oracle, MySQL...). The Denodo Dialect for Spark SQL allows Denodo to be seamlessly integrated with DataFrames.

The Denodo dialect is compatible with versions 2.4 and 3.0 of Apache Spark.

Installation and Configuration

This dialect can be downloaded from the Denodo Support Site.

Once the zip file is downloaded and unzipped, you need to copy the denodo-dialect-sparksql-{version}.jar file into the $SPARK_HOME/jars directory

To start a spark-shell you need to include the JDBC driver for Denodo on the spark classpath.

./bin/spark-shell --driver-class-path denodo-vdp-jdbcdriver.jar --jars denodo-vdp-jdbcdriver.jar

  • jars: Comma-separated list of local jars to include on the driver and executor classpaths.
  • driver-class-path: Extra class path entries to pass to the driver. Note that jars added with --jars are automatically included in the classpath.

Once the spark-shell has started up correctly, it is necessary to register the Denodo dialect in order to use it. To do that, you just need to run the following code:

import com.denodo.connect.spark.sql.jdbc.DenodoDialect

import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect}

JdbcDialects.registerDialect(DenodoDialect)

Execution

We will be able to use this dialect to read data from Denodo views. This dialect does not support writing operations.

Like with any other dialect, loading of data through JDBC can be achieved through the load jdbc method. This is an example using the spark-shell

// Load data from a Denodo JDBC source

scala> val df = spark.read

        .format("jdbc")

        .option("url", "jdbc:vdb://localhost:9999/admin")

        .option("driver", "com.denodo.vdp.jdbc.Driver")

        .option("dbtable", "bv_employees")

        .option("user", "admin")

        .option("password", "admin")

        .load()

As mentioned earlier in this document, the JDBC data source results are returned as a DataFrame. So it is possible to filter and process the rows of these DataFrames.

The Spark filter() or where() functions are used to filter the rows from the DataFrame based on the given condition or SQL expression. Here are some examples:

// Select the "id" and "email" column and add a filter for emails

// containing "%com.com%"

scala> df.select("id","email")

        .filter('email.contains("%com.com%"))

        .show()

+---+-----------------+

| id|            email|

+---+-----------------+

|  1| ajordan0@com.com|

|102|jharvey2t@com.com|

|449|ebarnescg@com.com|

|496|afloresdr@com.com|

|901| cramosp0@com.com|

+---+-----------------+

// Select the "id" and "first_name" columns and add a filter for "first_name"

// equals "Carlos"

scala> df.select("id","first_name")

        .filter(df("first_name") === "Carlos")

        .show()

+---+----------+

| id|first_name|

+---+----------+

|  5|    Carlos|

| 83|    Carlos|

|373|    Carlos|

|392|    Carlos|

|400|    Carlos|

|444|    Carlos|

|456|    Carlos|

|533|    Carlos|

|927|    Carlos|

+---+----------+

// Select the "id", "first_name" and "salary" columns and add a filter for

// first_name" equals "Carlos" and "salary" greater than 90000

scala> df.select("id","first_name","salary")

        .filter(df("first_name") === "Carlos" && df("salary") > 90000 )

        .show()

+---+----------+---------+

| id|first_name|   salary|

+---+----------+---------+

|373|    Carlos|247662.42|

|392|    Carlos|245592.99|

|444|    Carlos|111361.12|

|456|    Carlos|175537.91|

|533|    Carlos|261618.16|

+---+----------+---------+

// Select the "id", "first_name" and "salary" columns and add a filter for

// first_name" equals "Carlos" and "salary" greater than 90000. We also add

// a where clause for "id" between 300 and 500

scala> df.select("id","first_name","salary")

        .filter(df("first_name") === "Carlos" && df("salary") > 90000 )

        .where("id between 300 and 500")

        .show()

+---+----------+---------+

| id|first_name|   salary|

+---+----------+---------+

|373|    Carlos|247662.42|

|392|    Carlos|245592.99|

|444|    Carlos|111361.12|

|456|    Carlos|175537.91|

+---+----------+---------+

// Select the "id", "first_name" and "salary" columns and add a filter for

// first_name" equals "Carlos", "salary" greater than 90000 and

// "registration_dttm" greater than "2016-02-03 00:46:47". We also add

// a where clause for "id" between 300 and 500

scala> df.select("id","first_name","salary","registration_dttm")

        .filter(df("first_name") === "Carlos" && df("salary") > 90000 && df("registration_dttm") > "%2016-02-03 00:46:47%")

        .where("id between 300 and 500")

        .show()

+---+----------+---------+-------------------+

| id|first_name|   salary|  registration_dttm|

+---+----------+---------+-------------------+

|373|    Carlos|247662.42|2016-02-03 22:46:47|

|392|    Carlos|245592.99|2016-02-03 06:22:45|

|444|    Carlos|111361.12|2016-02-03 05:29:13|

|456|    Carlos|175537.91|2016-02-03 15:26:40|

+---+----------+---------+-------------------+

The same example using java would be something like this. This example runs locally with one thread. In this case it is necessary to include both Denodo's Spark dialect and Denodo's JDBC Driver in the classpath.

// First of all, we register the dialect

DenodoDialect$ denodoDialect = DenodoDialect$.MODULE$;

JdbcDialects.registerDialect(denodoDialect);

// Create a SparkSession. Master specifies the master URL for a distributed

// cluster, in this case local to run locally with one thread.

final SparkSession spark =

  SparkSession.builder().appName("DenodoTest").master("local[*]").getOrCreate();

// Load data from a Denodo JDBC source

Dataset<Row> df =

spark.read().format("jdbc").option("url", "jdbc:vdb://localhost:9999/admin")

  .option("driver", "com.denodo.vdp.jdbc.Driver").option("dbtable", "bv_employees")

  .option("user", "admin").option("password", "admin").load();

// Select the "id" and "email" column and add a filter for emails

// containing "%com.com%"

df.select("id","email")

  .filter(col("email").contains("%com.com%")).show();

// Select the "id" and "first_name" columns and add a filter for "first_name"

// equals "Carlos"

df.select("id","first_name")

  .filter(col("first_name").equalTo("Carlos")).show();

// Select the "id", "first_name" and "salary" columns and add a filter for

// first_name" equals "Carlos" and "salary" greater than 90000

df.select("id","first_name", "salary")

  .filter(col("first_name").equalTo("Carlos"))

  .filter(col("salary").$greater(90000)).show();

// Select the "id", "first_name" and "salary" columns and add a filter for

// first_name" equals "Carlos" and "salary" greater than 90000. We also add

// a where clause for "id" between 300 and 500

df.select("id","first_name", "salary")

  .filter(col("first_name").equalTo("Carlos"))

  .filter(col("salary").$greater(90000))

  .filter(col("id").between(300,500)).show();

// Select the "id", "first_name" and "salary" columns and add a filter for

// first_name" equals "Carlos", "salary" greater than 90000 and

// "registration_dttm" greater than "2016-02-02 21:04:03". We also add

// a where clause for "id" between 300 and 500

df.select("id","first_name", "salary","registration_dttm")

  .filter(col("first_name").equalTo("Carlos"))

  .filter(col("salary").$greater(90000))

  .filter(col("id").between(300,500))

  .filter(col("registration_dttm").$greater$eq("2016-02-02 21:04:03")).show();

Limitations

Composite Types

Can not work with composite Denodo types, like array and register. If a projection or filtering is applied to any of these data types, it will be treated as a String.

Read-only

This dialect is only designed for reading operations.