USER MANUALS

Kafka リスナー

Apache Kafka は分散イベントストアであり、ストリーム処理プラットフォームです。これを使用すると、アプリケーションで Kafka サーバーをサブスクライブしてデータを受信したり、任意の数のシステムやリアルタイムのアプリケーションにデータを公開したりすることができます。

Virtual DataPort では、 Kafka リスナー を作成し、Kafka サーバーで生成されたデータをサブスクライブできます。イベントが Kafka リスナーに達すると、リスナーはクエリを実行し、オプションで結果を Kafka トピックに公開します。Kafka リスナーを定義する場合、以下を行うようにリスナーを構成します。

  1. Kafka サーバーから受信した VQL ステートメントを実行する。

  2. あるいは、Kafka リスナーで補間変数 (@LISTENEREXPRESSION) を使用してクエリを定義し、実行時にこの変数を Kafka サーバーから受信した値で置き換える。

クエリの結果を Kafka トピックに公開するようにリスナーを構成することもできます。出力は XML ドキュメント、JSON ドキュメント、またはバイナリになります。

オプション (a) の動作例: クライアントは、 SELECT * FROM internet_inc WHERE iinc_id=1 などのメッセージを Kafka サーバーに送信します。Kafka サーバーは、このメッセージを Virtual DataPort に転送します。Virtual DataPort は応答を返しますが、選択した出力が XML の場合は、「 XML response message sent by a Kafka listener 」のようになり、JSON の場合は、「 JSON response message sent by a Kafka listener 」のようになります。

Kafka リスナーが送信する XML 応答メッセージ
<?xml version="1.0" encoding="UTF-8"?>
<response>
    <item>
        <iinc_id>1.00</iinc_id>
        <summary>Error in ADSL router</summary>
        <ttime>29-jun-2005 19h 19m 41s</ttime>
        <taxid>B78596011</taxid>
        <specific_field1>1</specific_field1>
        <specific_field2>1</specific_field2>
    </item>
</response>
Kafka リスナーが送信する JSON 応答メッセージ
[{
   "IINC_ID": 1,
   "SUMMARY": "Error in ADSL router",
   "TTIME": "2005-06-29",
   "TAXID": "B78596011",
   "SPECIFIC_FIELD1": "1",
   "SPECIFIC_FIELD2": "1"
}]

リクエストが ALTER VIEW incidents CACHE INVALIDATE のような DML 文である場合、応答は空です (「 XML response message to a DML query 」と「 JSON response message to a DML query 」を参照)。

DML クエリに対する XML 応答メッセージ
<?xml version="1.0" encoding="UTF-8"?>
<response />
DML クエリに対する JSON 応答メッセージ
[]

オプション (b) の動作例: 次のクエリを使用していて、リスナーを作成したとします。

Kafka リスナーで定義したサンプルクエリ
SELECT *
FROM incidents
WHERE taxid = '@LISTENEREXPRESSION'

リスナーが B78596014 のような値を受信すると、サーバーは以下のクエリを実行します。

Kafka リスナーで定義したサンプルクエリ
SELECT *
FROM incidents
WHERE taxid = 'B78596014'

リスナーで定義されているクエリに @LISTENEREXPRESSION 変数が含まれない場合、リスナーから受信した値は無効にされて、クエリは「そのままの形で」実行されます。

高度な Kafka 構成

リスナーが実行するクエリのタイムアウトとチャンクのタイムアウトを構成できます。つまり、リスナーが Virtual DataPort に対して開く接続でクエリを実行します。

  • クエリのタイムアウト: リスナーがクエリの終了を待機する最大時間 (ミリ秒)。ゼロ (0) の場合、クエリが終了するまで無期限に待機します。

    デフォルト値: 900000 ミリ秒 (15 分)

  • チャンクのタイムアウト: サーバーがリスナーに新しいデータブロックを返すまでの最大待機時間。この時間を超えると、サーバーはクエリが終了していなくても現在のブロックを送信します。

    デフォルト値:90000 ミリ秒 (90 秒)

このオプションの構成方法の例。
SET 'kafkalistener.queryTimeout'='500000'; -- 500 seconds

SET 'kafkalistener.chunkTimeout'='50000'; -- 50 seconds

Virtual DataPort を再起動して、この変更を有効にします。

Add feedback