Kafka リスナー¶
Apache Kafka は分散イベントストアであり、ストリーム処理プラットフォームです。これを使用すると、アプリケーションで Kafka サーバーをサブスクライブしてデータを受信したり、任意の数のシステムやリアルタイムのアプリケーションにデータを公開したりすることができます。
Virtual DataPort では、 Kafka リスナー を作成し、Kafka サーバーで生成されたデータをサブスクライブできます。イベントが Kafka リスナーに達すると、リスナーはクエリを実行し、オプションで結果を Kafka トピックに公開します。Kafka リスナーを定義する場合、以下を行うようにリスナーを構成します。
Kafka サーバーから受信した VQL ステートメントを実行する。
あるいは、Kafka リスナーで補間変数 (
@LISTENEREXPRESSION
) を使用してクエリを定義し、実行時にこの変数を Kafka サーバーから受信した値で置き換える。
クエリの結果を Kafka トピックに公開するようにリスナーを構成することもできます。出力は XML ドキュメント、JSON ドキュメント、またはバイナリになります。
オプション (a) の動作例: クライアントは、 SELECT * FROM internet_inc WHERE iinc_id=1
などのメッセージを Kafka サーバーに送信します。Kafka サーバーは、このメッセージを Virtual DataPort に転送します。Virtual DataPort は応答を返しますが、選択した出力が XML の場合は、「 XML response message sent by a Kafka listener 」のようになり、JSON の場合は、「 JSON response message sent by a Kafka listener 」のようになります。
<?xml version="1.0" encoding="UTF-8"?>
<response>
<item>
<iinc_id>1.00</iinc_id>
<summary>Error in ADSL router</summary>
<ttime>29-jun-2005 19h 19m 41s</ttime>
<taxid>B78596011</taxid>
<specific_field1>1</specific_field1>
<specific_field2>1</specific_field2>
</item>
</response>
[{
"IINC_ID": 1,
"SUMMARY": "Error in ADSL router",
"TTIME": "2005-06-29",
"TAXID": "B78596011",
"SPECIFIC_FIELD1": "1",
"SPECIFIC_FIELD2": "1"
}]
リクエストが ALTER VIEW incidents CACHE INVALIDATE
のような DML 文である場合、応答は空です (「 XML response message to a DML query 」と「 JSON response message to a DML query 」を参照)。
<?xml version="1.0" encoding="UTF-8"?>
<response />
[]
オプション (b) の動作例: 次のクエリを使用していて、リスナーを作成したとします。
SELECT *
FROM incidents
WHERE taxid = '@LISTENEREXPRESSION'
リスナーが B78596014
のような値を受信すると、サーバーは以下のクエリを実行します。
SELECT *
FROM incidents
WHERE taxid = 'B78596014'
リスナーで定義されているクエリに @LISTENEREXPRESSION
変数が含まれない場合、リスナーから受信した値は無効にされて、クエリは「そのままの形で」実行されます。
高度な Kafka 構成¶
リスナーが実行するクエリのタイムアウトとチャンクのタイムアウトを構成できます。つまり、リスナーが Virtual DataPort に対して開く接続でクエリを実行します。
クエリのタイムアウト: リスナーがクエリの終了を待機する最大時間 (ミリ秒)。ゼロ (0) の場合、クエリが終了するまで無期限に待機します。
デフォルト値: 900000 ミリ秒 (15 分)
チャンクのタイムアウト: サーバーがリスナーに新しいデータブロックを返すまでの最大待機時間。この時間を超えると、サーバーはクエリが終了していなくても現在のブロックを送信します。
デフォルト値:90000 ミリ秒 (90 秒)
SET 'kafkalistener.queryTimeout'='500000'; -- 500 seconds
SET 'kafkalistener.chunkTimeout'='50000'; -- 50 seconds
Virtual DataPort を再起動して、この変更を有効にします。