Defining Kafka Listeners

Virtual DataPort can subscribe to a Kafka server to listen to requests. Therefore, clients, instead of connecting to Virtual DataPort via JDBC, ODBC or a Web service, can send a request to a Kafka server, which forwards it to Virtual DataPort. Then, the response is sent back to a topic of the Kafka server, which forwards it to the client/s.

When creating a Kafka listener, you can set it up to:

  • Execute the VQL statements received from a Kafka server.

  • Or, define a query with the interpolation variable (@LISTENEREXPRESSION) in the Kafka listener and at runtime, replace this variable with the values received from the Kafka server.

The output of a Kafka listener can be either an XML document, a JSON document or a Binary document.

Example of how the option a) works: a client sends a message such as SELECT * FROM internet_inc WHERE iinc_id = 1 to the Kafka server. The Kafka server will forward this to Virtual DataPort, which will send a response back like XML response message sent by a Kafka listener if the selected output is XML, or like JSON response message sent by a Kafka listener if the selected output is JSON or like Binary response message sent by a Kafka listener.

XML response message sent by a Kafka listener
<?xml version="1.0" encoding="UTF-8"?>
<response>
    <item>
        <iinc_id>1.00</iinc_id>
        <summary>Error in ADSL router</summary>
        <ttime>29-jun-2005 19h 19m 41s</ttime>
        <taxid>B78596011</taxid>
        <specific_field1>1</specific_field1>
        <specific_field2>1</specific_field2>
    </item>
</response>
JSON response message sent by a Kafka listener
[
    {
        "IINC_ID": 1,
        "SUMMARY": "Error in ADSL router",
        "TTIME": "2005-06-29",
        "TAXID": "B78596011",
        "SPECIFIC_FIELD1": "1",
        "SPECIFIC_FIELD2": "1"
    }
]

If the request is a DML sentence such as ALTER VIEW incidents CACHE INVALIDATE, the response will be empty (see XML response message to a DML query and JSON response message to a DML query).

XML response message to a DML query
<?xml version="1.0" encoding="UTF-8"?>
<response />
JSON response message to a DML query
[]

Example of how option b) works: if you have created a Kafka listener with the following query:

Sample query defined in a Kafka listener
SELECT * FROM incidents
WHERE taxid = '@KafkaEXPRESSION'

When the listener receives the value B78596014, the Server will execute the following query:

SELECT * FROM incidents
WHERE taxid = 'B78596014'

If the query defined in the Kafka listener does not contain the @LISTENEREXPRESSION variable, the value received from the listener is ignored and the query is executed “as is”.

The following figures contain the syntax of the various commands to deal with Kafka listeners.

Syntax of the CREATE LISTENER KAFKA statement
CREATE [ OR REPLACE ] LISTENER KAFKA <name:identifier> [ID = <id:literal> ]
    GROUPID = <name:literal>
    INPUTTOPIC = <name:literal>
    [ REPLYTOPIC = <name:literal> ]
    [ IGNOREREPLYTOPIC ]
    [ INPUT = { PLAIN | BINARY } ]
    [ OUTPUT = { JSON | XML | BINARY } ]
    [ VDPDATABASE = <name:literal> ]
    VDPUSER = <name:literal>
    [ MESSAGESCONTAINVARIABLE = { TRUE | FALSE } QUERY = <query:literal> ]
    ENABLED = { TRUE | FALSE }
    AUTHENTICATION = { NONE | SASLPLAINTEXT | SASLGSSAPI }
    KAFKASERVER = <name:literal>
    [ CHUNKSIZE = <chunksize:literal> ]
    [ DLQ = <dql:literal> ]
    PROPERTIES ( <property> [, <property> ]* )
    [ FOLDER = <folder:literal> ]
    [ DESCRIPTION = <description:literal> ]

<property> ::=
    <key:literal> = <value:literal>
  • GROUPID is used to establish a group id so that multiple listeners make load balancing just in case they belong to the same consumer.

    Depending on the vendor of the Kafka server, we might have to create the Kafka destination, or it will be created automatically when the listener tries to subscribe to it.

  • INPUTTOPIC is used to establish from where Kafka listener will receive messages.

  • REPLYTOPIC is the name of the Kafka queue or topic where the responses will be sent to. Required if IGNOREREPLYTOPIC is not present.

    If the clause IGNOREREPLYTOPIC is present, the listener will never send a response.

  • INPUT. If the parameter is PLAIN, it will accept JSON, XML or Plain values and if it is BINARY it will accept byte[].

  • OUTPUT. If the parameter is XML the output of the listener will be an XML document. If it is JSON, a JSON document and if it is BINARY, a BINARY document. BINARY has a limitation of 1 row and 1 column response. Parameter required if IGNOREREPLYTOPIC is not present.

  • VDPUSER is the username that Virtual DataPort will use to check if the listener has enough privileges to execute a query. E.g. if our Virtual DataPort server has two users:

    • admin: is an “administrator” or a user promoted to “local administrator” of the database so it can access any view of the database.

    • user1: is a “normal user” that only has EXECUTE privileges over this database.

    If the parameter VDPUSER is “admin”, the listener will be able to execute any query. However, if VDPUSER is user1, the CREATE, DROP, INSERT, UPDATE and DELETE queries will fail because user1 only has EXECUTE privileges.

  • MESSAGESCONTAINVARIABLE and QUERY: if present and MESSAGESCONTAINVARIABLE is true, when the listener receives a Kafka message, the Server will replace the variable @LISTENEREXPRESSION of QUERY with the content of the Kafka message. Then, the Server will execute the resulting query.

  • ENABLED. TRUE to enable the listener. That is, after creating it, the listener will try to connect to the Kafka server to listen to requests. FALSE, to disable it.

  • AUTHENTICATION. You can specify 3 different authentication methods:

    • NONE: the listener will connect to the Kafka server using the PLAINTEXT protocol.

    • SASLPLAINTEXT: the listener will connect using the protocol SASLPLAINTEXT.

      This require to specify a username and password.

    • SASLGSSAPI: the listener will connect using the Kerberos protocol. You have to provide the principal, the location to the keytab file that contains the credential to connect to this server (you need to upload this file to the computer where Virtual DataPort runs) and the service name.

  • KAFKASERVER: IP and port of the Kafka server.

  • PROPERTIES. List of properties that will be used to obtain a connection to the Kafka server. The properties will follow a format key = value where key is the name of the property. It is required to always associate a value to the property.

  • FOLDER. Path of the folder where the listener will be stored. E.g.: FOLDER = '/Kafka listeners'.

  • DESCRIPTION. Description of the listener.

Command to enable/disable a Kafka listener: ALTER LISTENER Kafka
ALTER LISTENER Kafka <name:identifier>
ENABLED EQ { true \| false }

If ENABLED = TRUE and it was FALSE, it tries to connect to the Kafka server to listen to requests.

If ENABLED = FALSE and it was TRUE, it closes the connection with the Kafka server.

To rename a Kafka listener, use the statement RENAME:

Syntax of the statement RENAME LISTENER Kafka
RENAME LISTENER KAFKA <name:identifier> TO <new name:identifier>
Example
RENAME LISTENER KAFKA Kafka_listener_order TO Kafka_order_full;