Kafka Listeners¶
Apache Kafka is a distributed event store and stream-processing platform. It allows applications to subscribe to a Kafka server to receive data and also, publish data to any number of systems or real-time applications.
In Virtual DataPort you can create a Kafka listener to subscribe to data originated in a Kafka server. When an event reaches a Kafka listener, the listener will execute a query and optionally, publish the result to a Kafka topic. When you define a Kafka listener, you configure it to do this:
Execute the VQL statements received from the Kafka server.
Or, define a query with the interpolation variable (
@LISTENEREXPRESSION
) in the Kafka listener and at runtime, replace this variable with the value received from the Kafka server.
You can also configure the listener to publish the result of the queries to a Kafka topic. The output can be an XML document, a JSON document or in binary.
Example of how the option a) works: a client sends a message such as
SELECT * FROM internet_inc WHERE iinc_id=1
to the Kafka server.
The Kafka server will forward this to Virtual DataPort, which will send a
response back like XML response message sent by a Kafka listener if the
selected output is XML, or like JSON response message sent by a Kafka
listener if the selected output is JSON.
<?xml version="1.0" encoding="UTF-8"?>
<response>
<item>
<iinc_id>1.00</iinc_id>
<summary>Error in ADSL router</summary>
<ttime>29-jun-2005 19h 19m 41s</ttime>
<taxid>B78596011</taxid>
<specific_field1>1</specific_field1>
<specific_field2>1</specific_field2>
</item>
</response>
[{
"IINC_ID": 1,
"SUMMARY": "Error in ADSL router",
"TTIME": "2005-06-29",
"TAXID": "B78596011",
"SPECIFIC_FIELD1": "1",
"SPECIFIC_FIELD2": "1"
}]
If the request is a DML sentence such as
ALTER VIEW incidents CACHE INVALIDATE
, the response will be
empty (see XML response message to a DML query and JSON response
message to a DML query)
<?xml version="1.0" encoding="UTF-8"?>
<response />
[]
Example of how option b) works: if you have created a listener with the following query:
SELECT *
FROM incidents
WHERE taxid = '@LISTENEREXPRESSION'
When the listener receives a value like B78596014
, the Server will
execute the following query:
SELECT *
FROM incidents
WHERE taxid = 'B78596014'
If the query defined in the listener does not contain the
@LISTENEREXPRESSION
variable, the value received from the listener is
ignored and the query will be executed “as is”.
Advanced Kafka Configuration¶
You can configure the query timeout and chunk timeout for the queries that the listeners execute. That is, in the connection the listener opens to Virtual DataPort to run queries:
Query timeout: maximum time (in milliseconds) the listener will wait for the query to finish. If zero (0), it will wait indefinitely until the query finishes.
Default value: 900000 (15 minutes)
Chunk timeout: maximum time that the Server waits before returning a new block of data to the listener. When this time is surpassed, the Server sends the current block even if the query has not finished.
Default value: 90000 (90 seconds)
SET 'kafkalistener.queryTimeout'='500000'; -- 500 seconds
SET 'kafkalistener.chunkTimeout'='50000'; -- 50 seconds
Restart Virtual DataPort for these changes to take effect.