Defining Kafka Listeners¶
Virtual DataPort can subscribe to a Kafka server to listen to requests. Therefore, clients, instead of connecting to Virtual DataPort via JDBC, ODBC or a Web service, can send a request to a Kafka server, which forwards it to Virtual DataPort. Then, the response is sent back to a topic of the Kafka server, which forwards it to the client/s.
When creating a Kafka listener, you can set it up to:
Execute the VQL statements received from a Kafka server.
Or, define a query with the interpolation variable (
@LISTENEREXPRESSION
) in the Kafka listener and at runtime, replace this variable with the values received from the Kafka server.
The output of a Kafka listener can be either an XML document, a JSON document or a Binary document.
Example of how the option a) works: a client sends a message such as
SELECT * FROM internet_inc WHERE iinc_id = 1
to the Kafka server.
The Kafka server will forward this to Virtual DataPort, which will send a
response back like XML response message sent by a Kafka listener if the
selected output is XML, or like JSON response message sent by a Kafka
listener if the selected output is JSON or like Binary response message sent by a Kafka listener.
<?xml version="1.0" encoding="UTF-8"?>
<response>
<item>
<iinc_id>1.00</iinc_id>
<summary>Error in ADSL router</summary>
<ttime>29-jun-2005 19h 19m 41s</ttime>
<taxid>B78596011</taxid>
<specific_field1>1</specific_field1>
<specific_field2>1</specific_field2>
</item>
</response>
[
{
"IINC_ID": 1,
"SUMMARY": "Error in ADSL router",
"TTIME": "2005-06-29",
"TAXID": "B78596011",
"SPECIFIC_FIELD1": "1",
"SPECIFIC_FIELD2": "1"
}
]
If the request is a DML sentence such as
ALTER VIEW incidents CACHE INVALIDATE
, the response will be empty
(see XML response message to a DML query and JSON response message
to a DML query).
<?xml version="1.0" encoding="UTF-8"?>
<response />
[]
Example of how option b) works: if you have created a Kafka listener with the following query:
SELECT * FROM incidents
WHERE taxid = '@KafkaEXPRESSION'
When the listener receives the value B78596014
, the Server will
execute the following query:
SELECT * FROM incidents
WHERE taxid = 'B78596014'
If the query defined in the Kafka listener does not contain the
@LISTENEREXPRESSION
variable, the value received from the listener is ignored and the query is executed “as is”.
The following figures contain the syntax of the various commands to deal with Kafka listeners.
CREATE [ OR REPLACE ] LISTENER KAFKA <name:identifier> [ID = <id:literal> ]
GROUPID = <name:literal>
INPUTTOPIC = <name:literal>
[ REPLYTOPIC = <name:literal> ]
[ IGNOREREPLYTOPIC ]
[ INPUT = { PLAIN | BINARY } ]
[ OUTPUT = { JSON | XML | BINARY } ]
[ VDPDATABASE = <name:literal> ]
VDPUSER = <name:literal>
[ MESSAGESCONTAINVARIABLE = { TRUE | FALSE } QUERY = <query:literal> ]
ENABLED = { TRUE | FALSE }
AUTHENTICATION = { NONE | SASLPLAINTEXT | SASLGSSAPI }
[ SSL = { TRUE | FALSE } ]
KAFKASERVER = <name:literal>
[ CHUNKSIZE = <chunksize:literal> ]
[ DLQ = <dql:literal> ]
PROPERTIES ( <property> [, <property> ]* )
[ FOLDER = <folder:literal> ]
[ DESCRIPTION = <description:literal> ]
<property> ::=
<key:literal> = <value:literal>
GROUPID
is used to establish a group id so that multiple listeners make load balancing just in case they belong to the same consumer.Depending on the vendor of the Kafka server, we might have to create the Kafka destination, or it will be created automatically when the listener tries to subscribe to it.
INPUTTOPIC
is used to establish from where Kafka listener will receive messages.REPLYTOPIC
is the name of the Kafka queue or topic where the responses will be sent to. Required ifIGNOREREPLYTOPIC
is not present.If the clause
IGNOREREPLYTOPIC
is present, the listener will never send a response.INPUT
. If the parameter isPLAIN
, it will acceptJSON
,XML
or Plain values and if it isBINARY
it will accept byte[].OUTPUT
. If the parameter isXML
the output of the listener will be an XML document. If it isJSON
, a JSON document and if it isBINARY
, a BINARY document.BINARY
has a limitation of 1 row and 1 column response. Parameter required ifIGNOREREPLYTOPIC
is not present.VDPUSER
is the username that Virtual DataPort will use to check if the listener has enough privileges to execute a query. E.g. if our Virtual DataPort server has two users:admin
: is an “administrator” or a user promoted to “local administrator” of the database so it can access any view of the database.user1
: is a “normal user” that only hasEXECUTE
privileges over this database.
If the parameter
VDPUSER
is “admin”, the listener will be able to execute any query. However, ifVDPUSER
isuser1
, theCREATE
,DROP
,INSERT
,UPDATE
andDELETE
queries will fail becauseuser1
only hasEXECUTE
privileges.MESSAGESCONTAINVARIABLE
andQUERY
: if present andMESSAGESCONTAINVARIABLE
istrue
, when the listener receives a Kafka message, the Server will replace the variable@LISTENEREXPRESSION
ofQUERY
with the content of the Kafka message. Then, the Server will execute the resulting query.ENABLED
.TRUE
to enable the listener. That is, after creating it, the listener will try to connect to the Kafka server to listen to requests.FALSE
, to disable it.AUTHENTICATION
. You can specify 3 different authentication methods:NONE
: the listener will connect to the Kafka server using the PLAINTEXT protocol.SASLPLAINTEXT
: the listener will connect using the protocol SASLPLAINTEXT.This require to specify a username and password.
SASLGSSAPI
: the listener will connect using the Kerberos protocol. You have to provide the principal, the location to the keytab file that contains the credential to connect to this server (you need to upload this file to the computer where Virtual DataPort runs) and the service name.
KAFKASERVER
: IP and port of the Kafka server.PROPERTIES
. List of properties that will be used to obtain a connection to the Kafka server. The properties will follow a format key = value where key is the name of the property. It is required to always associate a value to the property.FOLDER
. Path of the folder where the listener will be stored. E.g.:FOLDER = '/Kafka listeners'
.DESCRIPTION
. Description of the listener.
ALTER LISTENER Kafka <name:identifier>
ENABLED EQ { true \| false }
If ENABLED = TRUE
and it was FALSE
, it tries to connect to the
Kafka server to listen to requests.
If ENABLED = FALSE
and it was TRUE
, it closes the connection
with the Kafka server.
To rename a Kafka listener, use the statement RENAME
:
RENAME LISTENER KAFKA <name:identifier> TO <new name:identifier>
RENAME LISTENER KAFKA Kafka_listener_order TO Kafka_order_full;