Apache Kafka is a distributed streaming platform. Initially conceived as a messaging queue, Kafka is based on an abstraction of a distributed commit log and is used for building real-time data pipelines and streaming apps.
As a streaming platform, it has three key capabilities:
Denodo Kafka Custom Wrapper as a Consumer
The Denodo Kafka Custom Wrapper allows you to consume records in two different ways:
The Denodo Kafka Custom Wrapper distribution consists of:
To import the custom wrapper, follow these steps:
To create a new Kafka custom data source:
Example of custom properties file for SSL configuration when you do not want to use the truststore of the JRE included in the Denodo Platform:
|
Valid configuration properties are documented at https://kafka.apache.org/documentation/#consumerconfigs
As mentioned, the Denodo Kafka Custom Wrapper offers two different implementations:
The Date Range implementation will be able to retrieve all messages in a topic between a specified date range (begin date inclusive, end date exclusive)
Executions of base views created on this wrapper can be concurrent, and they should always return the same result set as long as the message dataset at the server does not vary.
This wrapper requires a prefix to be defined for the consumer group ID, so that each execution uses a new, random consumer group ID starting with the defined prefix in order to avoid concurrent executions being assigned different sets of partitions in the topic and therefore returning partial data sets (standard partition balance behavior in Kafka).
The Incremental implementation will ask the server for the latest messages added to the topic’s partitions since the last time the base view was executed.
Executions of base views created on this wrapper consume the available message and then commit their latest read offset to the server, so that the next time it is executed the server will only send messages with newer offsets (if any). For this reason the wrapper requires a consumer group ID to be fixed at the base view configuration, which will be used for all executions of that base view.
Note that concurrent executions of this wrapper for the same base view, or for base views reading from the same topic that also share the same consumer group ID (which is not recommended) will cause the Kafka server to re-balance topic partitions among the wrapper executions and therefore send a different set of results to each of the concurrent executions.
To create a new base view using the Kafka data source:
Avro Schema sample { "namespace": "Message", "type": "record", "name": "EventMessage", "fields": [ { "name": "machine", "type": "string" }, { "name": "id", "type": "string" }, { "name": "date", "type": "long" }, { "name": "status", "type": "float" }, { "name": "building", "type": "string" } ] } |
JSON Schema sample { "title": "Message", "type": "object", "properties": { "type": { "type": "string" }, "t": { "type": "number" }, "k": { "type": "integer" } } } |
Protobuf Schema sample syntax = "proto3"; package test.protos; option java_multiple_files = true; message FirstMessage { int32 id = 1; string name = 2; map<int64, SecondMessage> messages = 3; oneof company_oneof{ repeated int64 numInfo = 4; SecondMessage newMessage = 5; } } message SecondMessage { enum Enumeration { ONE = 0; TWO = 1; }
int64 id = 1; string name = 2; Enumeration enumeration = 3; } |
The fields mentioned above are common to all the consumer ways. However, there are extra fields in some of the wrappers. These are:
The configuration for accessing Kafka with Kerberos enabled requires supplying the Kerberos credentials to the wrapper's configuration.
The Kerberos parameters are:
! Note |
If you enter a literal that contains one of the special characters used to indicate interpolation variables @, \, ^, {, }, you have to escape these characters with \. E.g if the Kerberos principal name contains @ you have to enter \@. |
! Note |
Except ‘Kerberos Service Name’, all previous parameters are mandatory when Kerberos is enabled. |
A ‘krb5.conf’ file should be present in the file system. Below there is an example of the Kerberos configuration file:
[libdefaults] renew_lifetime = 7d forwardable = true default_realm = EXAMPLE.COM ticket_lifetime = 24h dns_lookup_realm = false dns_lookup_kdc = false [domain_realm] <domain_name> = EXAMPLE.COM [realms] EXAMPLE.COM = { admin_server = <your_admin_server> kdc = <your_kdc> } [logging] default = FILE:/var/log/krb5kdc.log admin_server = FILE:/var/log/kadmind.log kdc = FILE:/var/log/krb5kdc.log |
The algorithm to locate the krb5.conf file is the following:
The execution of the command provides a prompt through which records with key:value format are inserted.
Once inserted, the data can be consulted through the Kafka’s command-line based consumer.
The inserted records can be seen in the output of the command execution.
As OFFSET, FETCH and LIMIT clauses are not delegated to the custom wrapper the use of these clauses can cause data loss. The same could happen with the WHERE clause.
Schema Registries are not a standard feature of the Avro, Protobuf or JSON standards, but they are a more or less common tool offered by Kafka messaging providers. The Denodo Kafka Custom Wrapper provides support for retrieving schemas from any schema registries that can return a schema as the payload of an URL, and also support for the Confluent Schema Registry which returns schemas in a custom payload format.
Note however there are some limitations to the use of schema registries in this wrapper:
Only one logical type is supported in Avro schemas: decimal.
Air tickets may have variations in their price every so often. In this context, Kafka can be very useful, since it allows each price change to be stored in a topic.
For this, a record with information relative to the flight at a certain moment could be stored (in JSON format, for instance) in that topic. This record would consist, for example, of the flight identifier and its associated price.
Denodo Kafka Custom Wrapper, using the incremental topic reading mode, can act as a consumer of the information stored in that topic. In this way, you can check the price changes of a flight since the last time they were consulted.
On the other hand, through the between dates consumption mode of a topic that has the Denodo Kafka Custom Wrapper, you can check how flight prices change over time.
The information obtained from the wrapper can be enriched by combining the information stored in the topic with the data of a view that contains the information about flights (flight identifier, airport of origin, airport of destination, date of departure, duration, etc.). This may be done through a JOIN clause.
Finally, to keep the cache of that join view updated, a Denodo Scheduler job of type VDPCache may be used to preload the cache of the cached views every certain time (every day, for instance).
Depending on the way of consumption used by the wrapper, the data can be applied to different contexts. For example, the data collected through the time interval reading mode can be very useful in Business Intelligence\Reporting Analytics applications. While, on the other hand, if the incremental reading mode is used, the data may be applicable in environments such as enterprise applications, web pages, etc.
In the following image you can see a general description of the air tickets use case.
Symptom
Importing the custom wrapper produces a Java Heap Space:
Resolution
Increase the memory allocated to the VDP Admin Tool. To do so, follow the next steps:
In this example, the memory has been increased to 2048Mb.
Symptom
org.apache.kafka.common.network.SslTransportLayer [] - Failed to send SSL Close message
javax.net.ssl.SSLHandshakeException: DHPublicKey does not comply to algorithm constraints
Resolution
The problem arises when the wrapper makes use of its SSL capabilities and the server only supports weak ciphers. To solve it there are two possibilities:
Locate the next entry:
jdk.tls.disabledAlgorithms=SSLv3, RC4, MD5withRSA, DH keySize < 1024, \
EC keySize < 224, DES40_CBC, RC4_40
And replace it with:
jdk.tls.disabledAlgorithms=SSLv3, RC4, MD5withRSA, \
EC keySize < 224, DES40_CBC, RC4_40
That is, 'DH keySize < 1024' must be deleted.
! Note |
The first solution offered is more secure, so we recommend using it instead of the second one. |
Kafka official page:
Kafka & Confluent:
JSON Schema:
Authentication using Kerberos:
Encryption with SSL
Encryption and Authentication with SSL
From MapR documentation: “MapR Event Store is a global publish-subscribe event streaming system for big data.”
As the Kafka API provides access to MapR Event Store, you can use the Kafka Custom Wrapper to connect to MapR Streams. This section explains how to do that.
To connect to the MapR cluster you need to install the MapR Client on your client machine (where the VDP server is running):
Set $MAPR_HOME environment variable to the directory where MapR client was installed. If MAPR_HOME environment variable is not defined /opt/mapr is the default path.
Copy mapr-clusters.conf from the MapR cluster to the $MAPR_HOME/conf folder in the VDP machine.
demo.mapr.com secure=true maprdemo:7222 |
Every user who wants to access a secure cluster must have a MapR ticket (maprticket_<username>) in the temporary directory (the default location).
Use the $MAPR_HOME/maprlogin command line tool to generate one:
C:\opt\mapr\bin>maprlogin.bat password -user mapr [Password for user 'mapr' at cluster 'demo.mapr.com': ] MapR credentials of user 'mapr' for cluster 'demo.mapr.com' are written to 'C:\Users\<username>\AppData\Local\Temp/maprticket_<username>' |
Add -Dmapr.library.flatclass to the VDP Server JVM options.
VDP Server JVM options
Otherwise, VDP will throw the exception java.lang.UnsatisfiedLinkError from JNISecurity.SetParsingDone() while executing the Kafka Custom Wrapper.
In order to use the MapR vendor libraries you should not import the Kafka Custom Wrapper into Denodo.
You have to create the custom data source using the ‘Classpath’ parameter instead of the ‘Select Jars’ option. Click Browse to select the directory containing the required dependencies for this custom wrapper:
The MapR Maven repository is located at http://repository.mapr.com/maven/. The name of the JAR files that you must use contains the version of Hadoop, Kafka, Zookeeper and MapR that you are using:
As MapRClient native library is bundled in maprfs-<mapr_version> jar you should use the maprfs jar that comes with the Mapr Client, previously installed, as the library is dependent on the operating system.
! Important |
MapR native library is included in these Custom Wrapper dependencies and can be loaded only once. Therefore, if you plan to access to other MapR sources with Denodo, like:
you have to use the same classpath to configure all the custom wrappers and the JDBC driver; see 'C:\Work\MapR Certification\mapr-lib' in the image above. With this configuration Denodo can reuse the same classloader and load the native library only once. |
Configure the Kafka wrapper parameters as usual, with these main differences:
MapR base view edition