Denodo Kafka Custom Wrapper - User Manual

Introduction

Apache Kafka is a distributed streaming platform. Initially conceived as a messaging queue, Kafka is based on an abstraction of a distributed commit log and is used for building real-time data pipelines and streaming apps.

As streaming platform, has three key capabilities:

  • Publish and subscribe to streams of records.

  • Store streams of records in a fault-tolerant durable way.

  • Process streams of records as they occur.

Denodo Kafka Custom Wrapper as a Consumer

The Denodo Kafka Custom Wrapper allows you to consume records in two different ways:

  1. Consumer between dates: read the records of a topic within a specific time interval. If no interval is given all the records will be read.

  1. Incremental consumer: read new records of a topic since the last query was made.

Installation

The Denodo Kafka Custom Wrapper distribution consists of:

  • /dist:

  • denodo-kafka-customwrapper-{denodo-version}-{version}.jar. The custom wrapper.

  • denodo-kafka-customwrapper-{denodo-version}-{version}-jar-with-dependencies.jar. The custom wrapper plus its dependencies. This is the wrapper we recommend to use, as it is easier to install in VDP.
  • denodo-kafka-customwrapper-{denodo-version}-{version}-sources. The custom wrapper source code.

  • /lib: All the dependencies required by this wrapper in case you need to use the denodo-kafka-customwrapper-{denodo-version}-{version}.jar.

Usage

Import the Custom Wrapper

To import the custom wrapper, follow these steps:

  1. In the VDP Administration Tool, go to:

 

  • Denodo 6.0: File → Jar management

  • From Denodo 7.0: File → Extension management

  1. Click on “Create” button and select the “denodo-kafka-customwrapper-{denodo-version}-{version}-jar-with-dependencies.jar file, located in the dist folder of the Denodo Kafka Custom Wrapper distribution, downloaded from the Denodo Support Site.

Create the Kafka data source

To create a new Kafka custom data source:

  1. In the VDP Administration Tool, go to: File → New… → Data source → Custom

  1. In the “Create a New Custom Data Source” window, do the following:

  • Set a name for the new Kafka data source in the “Name” field.

  • Click on “Select Jars” and select the file imported in the previous section.

  • The “Class name” field must be filled with:

com.denodo.connect.kafka.wrapper.KafkaDateRangeConsumerWrapper

In the “Class name” field, depending on the way you want to consume records, the selector component can be used to switch between these classes:

  • com.denodo.connect.kafka.wrapper.KafkaDateRangeConsumerWrapper: consumer between dates

  • com.denodo.connect.kafka.wrapper.KafkaIncrementalConsumerWrapper: incremental consumer

  1. Click on “Save” button.

Create the base view

To create a new base view using the Kafka data source:

  1. Double-click on the Kafka data source and then click on “Create base view”.

  1. Set the parameters as follows:

  • Connection String (mandatory): is a comma-separated list of host and port pairs that are the addresses of the Kafka brokers. Host and port pair uses : as the separator, e.g.: <kafka_host>:9092.

  • Topic (mandatory): Kafka brokers contains topics, that act like a message queue where client applications can write and read their data. All Kafka messages are organized into topics. Producers write data to topics and consumers read from them.

  • Message Format (mandatory): is the format that the message has. Supported formats are:

  • String

  • Avro

  • JSON

  • Schema Path: path to the file that contains the scheme that the records have when their format is Avro or JSON. Is mandatory when "Avro" or "JSON" has been selected in the "Message Format" field.

Avro Schema sample

{

  "namespace": "Message",

  "type": "record",

  "name": "EventMessage",

  "fields": [

        { "name": "machine", "type": "string" },

        { "name": "id", "type": "string" },

        { "name": "date", "type": "long" },

        { "name": "status", "type": "float" },

        { "name": "building", "type": "string" }

  ]

}

                                

JSON Schema sample

{

    "title": "Message",

    "type": "object",

    "properties": {

          "type": { "type": "string" },

          "t": { "type": "number" },

          "k": { "type": "integer" }

    }

}

  • Connection Timeout (ms): is the maximum time (in milliseconds) that the custom wrapper will be connected to Kafka to consume data. This timeout is necessary because, since the Kafka consumer is continually waiting to receive new records, it is necessary to establish a maximum connection time so that eventually the consumption of data by the wrapper ends. By default is 10 minutes.

  • Consumer Timeout (ms): is the maximum time (in milliseconds) that the custom wrapper will wait to receive data when there is no data available in the buffer. If 0, returns immediately with any records that are available currently in the buffer. Must not be negative. By default is 7 seconds.

  • SSL Enabled: if checked, the SSL protocol will be used. Note that, if the Kafka broker certificate is self signed, it must be imported into the truststore of the JRE used by Denodo Server, i.e., <DENODO_HOME>/jre/lib/security/cacerts.

  • Custom Properties Path: is the route to a properties file that contains extra configuration properties of Kafka. Note that, if a configuration property is present both in the properties file and wrapper configuration fields, the property that have been entered through the wrapper configuration fields always will have more priority.

Example of custom properties file for SSL configuration when you do not want to use the  truststore of the JRE included in the Denodo Platform:

ssl.truststore.location=/path/to/truststore

ssl.truststore.password=test1234

Valid configuration properties are documented at https://kafka.apache.org/documentation/#consumerconfigs

The fields mentioned above are common to all the consumer ways. However, there are extra fields in some of the wrappers. These are:

  • Incremental consumer:

  • Group Id (mandatory): specifies the name of the Kafka Consumer Group the custom wrapper will belong to.

  • Consumer between dates:

  • Begin Date (optional): is the minimum date from which records are obtained, (inclusive).

  • End Date (optional): is the maximum date from which records are obtained (exclusive).

Secure cluster with Kerberos

The configuration for accessing Kafka with Kerberos enabled requires supplying the Kerberos credentials to the wrapper's configuration.

The Kerberos parameters are:

  • Kerberos principal name: Kerberos v5 Principal name to access Kafka, e.g. primary/instance@realm.

! Note

If you enter a literal that contains one of the special characters used to indicate interpolation variables @, \, ^, {, },  you have to escape these characters with \.

E.g if the Kerberos principal name contains @ you have to enter \@.

  • Kerberos keytab file: Keytab file containing the key of the Kerberos principal.

  • Kerberos Distribution Center: Kerberos Key Distribution Center.

  • Kerberos Service Name: The custom wrapper's configuration must include a property called 'sasl.kerberos.service.name'. The default value of that property is set as 'kafka'. But, if a different value is desired instead of the default one, you must include a new entry with the new value for the property in the properties file loaded with the 'Custom Properties Path' field. For example, the file must contain a line like this: sasl.kerberos.service.name=your_service_name

! Note

Except ‘Kerberos Service Name, all previous parameters are mandatory when Kerberos is enabled.

A krb5.conf file should be present in the file system. Below there is an example of the Kerberos configuration file:

[libdefaults]

  renew_lifetime = 7d

  forwardable = true

  default_realm = EXAMPLE.COM

  ticket_lifetime = 24h

  dns_lookup_realm = false

  dns_lookup_kdc = false

[domain_realm]

  <domain_name> = EXAMPLE.COM

[realms]

  EXAMPLE.COM = {

    admin_server = <your_admin_server>

    kdc = <your_kdc>

  }

[logging]

  default = FILE:/var/log/krb5kdc.log

  admin_server = FILE:/var/log/kadmind.log

  kdc = FILE:/var/log/krb5kdc.log

The algorithm to locate the krb5.conf file is the following:

  • If the system property java.security.krb5.conf is set, its value is assumed to specify the path and file name.

  • If that system property value is not set, then the configuration file is looked for in the directory

  • <java-home>\lib\security (Windows)
  • <java-home>/lib/security (Solaris and Linux)

  • If the file is still not found, then an attempt is made to locate it as follows:

  • /etc/krb5/krb5.conf (Solaris)
  • c:\winnt\krb5.ini (Windows)
  • /etc/krb5.conf (Linux)

Example

  1. In order to show an example of execution of the wrapper, a set of messages have been published in Kafka. These records have been added in the topic ‘my-test-topic’ by the command-line based producer included in the Kafka’s installation.

The execution of the command provides a prompt through which records with key:value format  are inserted.

Once inserted, the data can be consulted through the Kafka’s command-line based consumer.

        

        The inserted records can be seen in the output of the command execution.

        

  1. Create a base view:

  • Connection String = 192.168.56.102:9092

  • Topic = my-test-topic

  • Message Format: String

        

        

  1. The schema of the base view is shown and you can rename it:

  1. After clicking on “Ok”, you can execute SELECT queries, for example:

  • SELECT * FROM bv_kafka;

                

                

Limitations

Incremental wrapper data loss

As OFFSET, FETCH and LIMIT clauses are not delegated to the custom wrapper the use of these clauses can cause data loss. The same could happen with the WHERE clause.

Use case

Prices of an airline

Air tickets may have variations in their price every so often. In this context, Kafka can be very useful, since it allows each price change to be stored in a topic.

For this, a record with information relative to the flight at a certain moment could be stored (in JSON format, for instance) in that topic. This record would consist, for example, of the flight identifier and its associated price.

Denodo Kafka Custom Wrapper, using the incremental topic reading mode, can act as a consumer of the information stored in that topic. In this way, you can check the price changes of a flight since the last time they were consulted.

On the other hand, through the from the between dates consumption mode of a topic that has the Denodo Kafka Custom Wrapper, you can check how flight prices change over time.

The information obtained from the wrapper can be enriched by combining the information stored in the topic with the data of a view that contains the information about flights (flight identifier, airport of origin, airport of destination, date of departure, duration, etc.). This may be done through a JOIN clause.

Finally, to keep the cache of that join view updated, a Denodo Scheduler job of type VDPCache may be used to preload the cache of the cached views every certain time (every day, for instance).

Depending on the way of consumption used by the wrapper, the data can be applied to different contexts. For example, the data collected through the time interval reading mode can be very useful in Business Intelligence\Reporting Analytics applications. While, on the other hand, if the incremental reading mode is used, the data may be applicable in environments such as enterprise applications, web pages, etc.

In the following image you can see a general description of the air tickets use case.

Troubleshooting

Symptom

Importing the custom wrapper produces a Java Heap Space:

Resolution

Increase the memory allocated to the VDP Admin Tool. To do so, follow the next steps:

  1. In the Denodo Platform Control Center, go to ‘Configure’.

  1. Click con ‘JVM Options’.

  1. On the ‘Memory Options’ tab, increase the allocated memory to the Virtual DataPort Administration Tool.

        

        

        In this example, the memory has been increased to 2048Mb.

Symptom

org.apache.kafka.common.network.SslTransportLayer [] - Failed to send SSL Close message  

javax.net.ssl.SSLHandshakeException: DHPublicKey does not comply to algorithm constraints

Resolution

The problem arises when the wrapper makes use of its SSL capabilities and the server only supports weak ciphers. To solve it there are two possibilities:

  1. Upgrade the Java version of Kafka's server.

  1. Open the file 'java.security' located in '<DENODO_HOME>\jre\lib\security'.

Locate the next entry:

jdk.tls.disabledAlgorithms=SSLv3, RC4, MD5withRSA, DH keySize < 1024, \

        EC keySize < 224, DES40_CBC, RC4_40

And replace it with:

jdk.tls.disabledAlgorithms=SSLv3, RC4, MD5withRSA, \

        EC keySize < 224, DES40_CBC, RC4_40

That is, 'DH keySize < 1024' must be deleted.

! Note

The first solution offered is more secure, so we recommend using it instead of the second one.

References

Kafka official page:

Kafka & Confluent:

JSON Schema:

Authentication using Kerberos:

Encryption with SSL

Encryption and Authentication with SSL

Appendix

How to connect to MapR Event Store (Streams)

From MapR documentation: “MapR Event Store is a global publish-subscribe event streaming system for big data.

As the Kafka API provides access to MapR Event Store, you can use the Kafka Custom Wrapper to connect to MapR Streams. This section explains how to do that.

Install MapR Client

To connect to the MapR cluster you need to install the MapR Client on your client machine (where the VDP server is running):

 

  • Verify that the operating system on the machine where you plan to install the MapR Client is supported, see MapR Client Support Matrix.

Set $MAPR_HOME environment variable to the directory where MapR client was installed. If MAPR_HOME environment variable is not defined /opt/mapr is the default path.

Copy mapr-clusters.conf file

Copy mapr-clusters.conf from the MapR cluster to the $MAPR_HOME/conf folder in the VDP machine.

demo.mapr.com secure=true maprdemo:7222

Generate MapR ticket (secure clusters only)

Every user who wants to access a secure cluster must have a MapR ticket (maprticket_<username>) in the temporary directory (the default location).

Use the $MAPR_HOME/maprlogin command line tool to generate one:

C:\opt\mapr\bin>maprlogin.bat password -user mapr

[Password for user 'mapr' at cluster 'demo.mapr.com': ]

MapR credentials of user 'mapr' for cluster 'demo.mapr.com' are written to 'C:\Users\<username>\AppData\Local\Temp/maprticket_<username>'

! Note

If you get an error like

java.security.InvalidAlgorithmParameterException: the trustAnchors parameter must be non-empty when executing maprlogin

you need to specify a truststore before executing the maprlogin command.

For this, you can copy the /opt/mapr/ssl_truststore from MapR Cluster to $MAPR_HOME/conf directory in the local machine.

Add JVM option

Add -Dmapr.library.flatclass to the VDP Server JVM options.

VDP Server JVM options

Otherwise, VDP will throw the exception java.lang.UnsatisfiedLinkError from JNISecurity.SetParsingDone() while executing the Kafka Custom Wrapper.

Create custom data source

In order to use the MapR vendor libraries you should not import the Kafka Custom Wrapper into Denodo.

You have to create the custom data source using the ‘Classpath’ parameter instead of the ‘Select Jars’ option. Click Browse to select the directory containing the required dependencies for this custom wrapper:

  • The denodo-kafka-customwrapper-${version}.jar file of the dist directory of this custom wrapper distribution.

  • The contents of the lib directory of  this custom wrapper distribution, replacing the Apache Hadoop libraries with the MapR ones.

The MapR Maven repository is located at http://repository.mapr.com/maven/. The name of the JAR files that you must use contains the version of Hadoop, Kafka, Zookeeper and MapR that you are using:

  • hadoop-xxx-<hadoop_version>-<mapr_version>
  • kafka-xxx-<hadoop_version>-<mapr_version>
  • connect-xxx-<kafka version>-<mapr_version>
  • maprfs-<mapr_version>

As MapRClient native library is bundled in maprfs-<mapr_version> jar you should use the maprfs jar that comes with the Mapr Client, previously installed, as the library is dependent on the operating system.

  • mapr-streams-<mapr_version>
  • zookeeper-<zookeeper_version>-<mapr_version>
  • json-<version>
  • the other dependencies of the lib directory of  this custom wrapper distribution

! Important

MapR native library is included in these Custom Wrapper dependencies and can be loaded only once.

Therefore, if you plan to access to other MapR sources with Denodo, like:

  • MapR FileSystem with HDFS Custom Wrapper
  • MapR Database with HBase Custom Wrapper
  • Drill with JDBC Wrapper.

you have to use the same classpath to configure all the custom wrappers and the JDBC driver; see  'C:\Work\MapR Certification\mapr-lib' in the image above.

With this configuration Denodo can reuse the same classloader and load the native library only once.

Configure base view

Configure the Kafka wrapper parameters as usual, with these main differences:

  • connection is the Mapr Cluster address as there is no broker in the Mapr Event Store implementation.

  • the topic field should follow the MapR Streams naming convention /stream:topic, e.g. /sample-stream:fast-messages

MapR base view edition