Apache Kafka is a distributed streaming platform. Initially conceived as a messaging queue, Kafka is based on an abstraction of a distributed commit log and is used for building real-time data pipelines and streaming apps.
As streaming platform, has three key capabilities:
Denodo Kafka Custom Wrapper as a Consumer
The Denodo Kafka Custom Wrapper allows you to consume records in two different ways:
The Denodo Kafka Custom Wrapper distribution consists of:
To import the custom wrapper, follow these steps:
To create a new Kafka custom data source:
com.denodo.connect.kafka.wrapper.KafkaDateRangeConsumerWrapper
In the “Class name” field, depending on the way you want to consume records, the selector component can be used to switch between these classes:
Example of custom properties file for SSL configuration when you do not want to use the truststore of the JRE included in the Denodo Platform:
|
Valid configuration properties are documented at https://kafka.apache.org/documentation/#consumerconfigs
To create a new base view using the Kafka data source:
Avro Schema sample { "namespace": "Message", "type": "record", "name": "EventMessage", "fields": [ { "name": "machine", "type": "string" }, { "name": "id", "type": "string" }, { "name": "date", "type": "long" }, { "name": "status", "type": "float" }, { "name": "building", "type": "string" } ] } |
JSON Schema sample { "title": "Message", "type": "object", "properties": { "type": { "type": "string" }, "t": { "type": "number" }, "k": { "type": "integer" } } } |
The fields mentioned above are common to all the consumer ways. However, there are extra fields in some of the wrappers. These are:
The configuration for accessing Kafka with Kerberos enabled requires supplying the Kerberos credentials to the wrapper's configuration.
The Kerberos parameters are:
! Note |
If you enter a literal that contains one of the special characters used to indicate interpolation variables @, \, ^, {, }, you have to escape these characters with \. E.g if the Kerberos principal name contains @ you have to enter \@. |
! Note |
Except ‘Kerberos Service Name’, all previous parameters are mandatory when Kerberos is enabled. |
A ‘krb5.conf’ file should be present in the file system. Below there is an example of the Kerberos configuration file:
[libdefaults] renew_lifetime = 7d forwardable = true default_realm = EXAMPLE.COM ticket_lifetime = 24h dns_lookup_realm = false dns_lookup_kdc = false [domain_realm] <domain_name> = EXAMPLE.COM [realms] EXAMPLE.COM = { admin_server = <your_admin_server> kdc = <your_kdc> } [logging] default = FILE:/var/log/krb5kdc.log admin_server = FILE:/var/log/kadmind.log kdc = FILE:/var/log/krb5kdc.log |
The algorithm to locate the krb5.conf file is the following:
The execution of the command provides a prompt through which records with key:value format are inserted.
Once inserted, the data can be consulted through the Kafka’s command-line based consumer.
The inserted records can be seen in the output of the command execution.
As OFFSET, FETCH and LIMIT clauses are not delegated to the custom wrapper the use of these clauses can cause data loss. The same could happen with the WHERE clause.
Air tickets may have variations in their price every so often. In this context, Kafka can be very useful, since it allows each price change to be stored in a topic.
For this, a record with information relative to the flight at a certain moment could be stored (in JSON format, for instance) in that topic. This record would consist, for example, of the flight identifier and its associated price.
Denodo Kafka Custom Wrapper, using the incremental topic reading mode, can act as a consumer of the information stored in that topic. In this way, you can check the price changes of a flight since the last time they were consulted.
On the other hand, through the between dates consumption mode of a topic that has the Denodo Kafka Custom Wrapper, you can check how flight prices change over time.
The information obtained from the wrapper can be enriched by combining the information stored in the topic with the data of a view that contains the information about flights (flight identifier, airport of origin, airport of destination, date of departure, duration, etc.). This may be done through a JOIN clause.
Finally, to keep the cache of that join view updated, a Denodo Scheduler job of type VDPCache may be used to preload the cache of the cached views every certain time (every day, for instance).
Depending on the way of consumption used by the wrapper, the data can be applied to different contexts. For example, the data collected through the time interval reading mode can be very useful in Business Intelligence\Reporting Analytics applications. While, on the other hand, if the incremental reading mode is used, the data may be applicable in environments such as enterprise applications, web pages, etc.
In the following image you can see a general description of the air tickets use case.
Symptom
Importing the custom wrapper produces a Java Heap Space:
Resolution
Increase the memory allocated to the VDP Admin Tool. To do so, follow the next steps:
In this example, the memory has been increased to 2048Mb.
Symptom
org.apache.kafka.common.network.SslTransportLayer [] - Failed to send SSL Close message
javax.net.ssl.SSLHandshakeException: DHPublicKey does not comply to algorithm constraints
Resolution
The problem arises when the wrapper makes use of its SSL capabilities and the server only supports weak ciphers. To solve it there are two possibilities:
Locate the next entry:
jdk.tls.disabledAlgorithms=SSLv3, RC4, MD5withRSA, DH keySize < 1024, \
EC keySize < 224, DES40_CBC, RC4_40
And replace it with:
jdk.tls.disabledAlgorithms=SSLv3, RC4, MD5withRSA, \
EC keySize < 224, DES40_CBC, RC4_40
That is, 'DH keySize < 1024' must be deleted.
! Note |
The first solution offered is more secure, so we recommend using it instead of the second one. |
Kafka official page:
Kafka & Confluent:
JSON Schema:
Authentication using Kerberos:
Encryption with SSL
Encryption and Authentication with SSL
From MapR documentation: “MapR Event Store is a global publish-subscribe event streaming system for big data.”
As the Kafka API provides access to MapR Event Store, you can use the Kafka Custom Wrapper to connect to MapR Streams. This section explains how to do that.
To connect to the MapR cluster you need to install the MapR Client on your client machine (where the VDP server is running):
Set $MAPR_HOME environment variable to the directory where MapR client was installed. If MAPR_HOME environment variable is not defined /opt/mapr is the default path.
Copy mapr-clusters.conf from the MapR cluster to the $MAPR_HOME/conf folder in the VDP machine.
demo.mapr.com secure=true maprdemo:7222 |
Every user who wants to access a secure cluster must have a MapR ticket (maprticket_<username>) in the temporary directory (the default location).
Use the $MAPR_HOME/maprlogin command line tool to generate one:
C:\opt\mapr\bin>maprlogin.bat password -user mapr [Password for user 'mapr' at cluster 'demo.mapr.com': ] MapR credentials of user 'mapr' for cluster 'demo.mapr.com' are written to 'C:\Users\<username>\AppData\Local\Temp/maprticket_<username>' |
Add -Dmapr.library.flatclass to the VDP Server JVM options.
VDP Server JVM options
Otherwise, VDP will throw the exception java.lang.UnsatisfiedLinkError from JNISecurity.SetParsingDone() while executing the Kafka Custom Wrapper.
In order to use the MapR vendor libraries you should not import the Kafka Custom Wrapper into Denodo.
You have to create the custom data source using the ‘Classpath’ parameter instead of the ‘Select Jars’ option. Click Browse to select the directory containing the required dependencies for this custom wrapper:
The MapR Maven repository is located at http://repository.mapr.com/maven/. The name of the JAR files that you must use contains the version of Hadoop, Kafka, Zookeeper and MapR that you are using:
As MapRClient native library is bundled in maprfs-<mapr_version> jar you should use the maprfs jar that comes with the Mapr Client, previously installed, as the library is dependent on the operating system.
! Important |
MapR native library is included in these Custom Wrapper dependencies and can be loaded only once. Therefore, if you plan to access to other MapR sources with Denodo, like:
you have to use the same classpath to configure all the custom wrappers and the JDBC driver; see 'C:\Work\MapR Certification\mapr-lib' in the image above. With this configuration Denodo can reuse the same classloader and load the native library only once. |
Configure the Kafka wrapper parameters as usual, with these main differences:
MapR base view edition