Kafka リスナーの定義¶
Virtual DataPort では Kafka サーバーにサブスクライブして、リクエストを待機できます。そのため、クライアントは、JDBC、ODBC、または Web サービスを介して Virtual DataPort に接続する代わりに、Kafka サーバーにリクエストを送信できます。Kafka サーバーは、受け取ったリクエストを Virtual DataPort に転送します。その後、Virtual DataPort からの応答が Kafka サーバーのトピックに返され、クライアントに転送されます。
Kafka リスナーを作成するときは、次の処理を実行できるよう設定できます。
Kafka サーバーから受信した VQL ステートメントを実行する。
あるいは、Kafka リスナーで補間変数 (
@LISTENEREXPRESSION) を使用してクエリを定義し、実行時にこの変数を Kafka サーバーから受信した値で置き換える。
Kafka リスナーの出力は、XML ドキュメント、JSON ドキュメント、Binary ドキュメントのいずれかを選択できます。
オプション (a) の動作例: クライアントは、 SELECT * FROM internet_inc WHERE iinc_id=1 などのメッセージを Kafka サーバーに送信します。Kafka サーバーは、このメッセージを Virtual DataPort に転送します。Virtual DataPort は応答を返しますが、選択した出力が XML の場合は、「 XML response message sent by a Kafka listener 」、JSON の場合は、「 JSON response message sent by a Kafka listener 」、あるいは「 Binary response message sent by a Kafka listener 」のようになります。
<?xml version="1.0" encoding="UTF-8"?>
<response>
<item>
<iinc_id>1.00</iinc_id>
<summary>Error in ADSL router</summary>
<ttime>29-jun-2005 19h 19m 41s</ttime>
<taxid>B78596011</taxid>
<specific_field1>1</specific_field1>
<specific_field2>1</specific_field2>
</item>
</response>
[
{
"IINC_ID": 1,
"SUMMARY": "Error in ADSL router",
"TTIME": "2005-06-29",
"TAXID": "B78596011",
"SPECIFIC_FIELD1": "1",
"SPECIFIC_FIELD2": "1"
}
]
リクエストが ALTER VIEW incidents CACHE INVALIDATE などの DML センテンスの場合、応答は空になります (「 XML response message to a DML query 」および「 JSON response message to a DML query 」を参照)。
<?xml version="1.0" encoding="UTF-8"?>
<response />
[]
オプション (b) の動作例: 次のクエリを使用していて Kafka リスナーを作成したとします。
SELECT * FROM incidents
WHERE taxid = '@KafkaEXPRESSION'
リスナーが値 B78596014 を受信すると、サーバーは次のクエリを実行します。
SELECT * FROM incidents
WHERE taxid = 'B78596014'
Kafka リスナーで定義したクエリに @LISTENEREXPRESSION 変数が含まれていない場合、リスナーから受信した値は無視され、クエリは「そのまま」実行されます。
次の図には、Kafka リスナーを扱うさまざまなコマンドの構文が含まれています。
CREATE [ OR REPLACE ] LISTENER KAFKA <name:identifier> [ID = <id:literal> ]
GROUPID = <name:literal>
INPUTTOPIC = <name:literal>
[ REPLYTOPIC = <name:literal> ]
[ IGNOREREPLYTOPIC ]
[ INPUT = { PLAIN | BINARY } ]
[ OUTPUT = { JSON | XML | BINARY } ]
[ VDPDATABASE = <name:literal> ]
VDPUSER = <name:literal>
[ MESSAGESCONTAINVARIABLE = { TRUE | FALSE } QUERY = <query:literal> ]
ENABLED = { TRUE | FALSE }
AUTHENTICATION = { NONE | SASLPLAINTEXT | SASLGSSAPI }
[ SSL = { TRUE | FALSE } ]
KAFKASERVER = <name:literal>
[ CHUNKSIZE = <chunksize:literal> ]
[ DLQ = <dql:literal> ]
PROPERTIES ( <property> [, <property> ]* )
[ FOLDER = <folder:literal> ]
[ DESCRIPTION = <description:literal> ]
<property> ::=
<key:literal> = <value:literal>
GROUPIDは、複数のリスナーが同じコンシューマーに属する場合に負荷分散されるように、グループ ID を確立するために使用します。Kafka サーバーのベンダーによっては、サブスクライブ先を作成しなければならない場合があります。そうでない場合は、リスナーがサブスクライブしようとしたときに、サブスクライブ先が自動的に作成されます。
INPUTTOPICは、Kafka リスナーがメッセージを受信する受信元を設定する場合に使用します。REPLYTOPICは、応答が返される Kafka キューまたはトピックの名前です。IGNOREREPLYTOPICが指定されていない場合は必須です。IGNOREREPLYTOPIC句が指定されている場合、リスナーは応答を決して返しません。INPUT: このパラメータがPLAINの場合はJSON、XMLまたはプレーンの値を受け取り、BINARYの場合は byte[] を受け取ります。OUTPUT: このパラメータがXMLの場合、リスナーの出力は XML ドキュメントになります。JSONの場合は JSON ドキュメント、BINARYの場合は BINARY ドキュメントになります。BINARYの応答は 1 行 1 列に制限されています。IGNOREREPLYTOPICが指定されていない場合に必須のパラメータです。VDPUSERは、リスナーがクエリを実行するための十分な権限を持っているかどうかを確認するために、Virtual DataPort が使用するユーザー名です。Virtual DataPort サーバーにユーザーが 2 名登録されている例を以下に示します。admin: 「管理者」、またはデータベースの任意のビューにアクセスすることのできる、データベースの「ローカル管理者」に昇格したユーザー。user1: このデータベースに対するEXECUTE権限だけを持つ「標準ユーザー」。
パラメータ
VDPUSERが「admin」の場合、リスナーは任意のクエリを実行できます。ただし、VDPUSERがuser1の場合、user1にはEXECUTE権限しかないため、CREATE、DROP、INSERT、UPDATE、DELETEクエリは失敗します。MESSAGESCONTAINVARIABLEおよびQUERY: これらが指定され、MESSAGESCONTAINVARIABLEがtrueの場合、リスナーが Kafka メッセージを受信すると、サーバーはQUERYの変数@LISTENEREXPRESSIONを Kafka メッセージの内容に置き換えます。その後、サーバーは、置換後のクエリを実行します。ENABLED:TRUEの場合、リスナーが有効になります。つまり、リスナーの作成後、そのリスナーはリクエストを待機するために Kafka サーバーへのコネクションを試みます。FALSEの場合、リスナーは無効になります。AUTHENTICATION: 3 種類の認証方法を指定できます。NONE: リスナーは、 PLAINTEXT プロトコルを使用して Kafka サーバーに接続します。SASLPLAINTEXT: リスナーは SASLPLAINTEXT プロトコルを使用して接続します。この場合、ユーザー名とパスワードを指定する必要があります。
SASLGSSAPI: リスナーは Kerberos プロトコルを使用して接続します。プリンシパル、このサーバーに接続するための資格情報を含む keytab ファイルの場所 (Virtual DataPort が動作するコンピュータにこのファイルをアップロードする必要があります)、およびサービス名を指定する必要があります。
KAFKASERVER: Kafka サーバーの IP とポート。PROPERTIES: Kafka サーバーへのコネクションを取得するために使用されるプロパティのリスト。プロパティは key = value の形式に従います。key はプロパティの名前です。常に値をプロパティに関連付ける必要があります。FOLDER: リスナーが格納されるフォルダのパス。例:FOLDER = '/Kafka listeners'DESCRIPTION: リスナーの説明。
ALTER LISTENER Kafka <name:identifier>
ENABLED EQ { true \| false }
ENABLED = TRUE で、以前は FALSE だった場合は、リクエストを待機するために Kafka サーバーへのコネクションを試みます。
ENABLED = FALSE で、以前は TRUE だった場合は、Kafka サーバーとのコネクションを閉じます。
Kafka リスナーの名前を変更するには、次のような RENAME ステートメントを使用します。
RENAME LISTENER KAFKA <name:identifier> TO <new name:identifier>
RENAME LISTENER KAFKA Kafka_listener_order TO Kafka_order_full;
