USER MANUALS

Kafka リスナーの定義

Virtual DataPort では Kafka サーバーにサブスクライブして、リクエストを待機できます。そのため、クライアントは、JDBC、ODBC、または Web サービスを介して Virtual DataPort に接続する代わりに、Kafka サーバーにリクエストを送信できます。Kafka サーバーは、受け取ったリクエストを Virtual DataPort に転送します。その後、Virtual DataPort からの応答が Kafka サーバーのトピックに返され、クライアントに転送されます。

Kafka リスナーを作成するときは、次の処理を実行できるよう設定できます。

  • Kafka サーバーから受信した VQL ステートメントを実行する。

  • あるいは、Kafka リスナーで補間変数 (@LISTENEREXPRESSION) を使用してクエリを定義し、実行時にこの変数を Kafka サーバーから受信した値で置き換える。

Kafka リスナーの出力は、XML ドキュメント、JSON ドキュメント、Binary ドキュメントのいずれかを選択できます。

オプション (a) の動作例: クライアントは、 SELECT * FROM internet_inc WHERE iinc_id=1 などのメッセージを Kafka サーバーに送信します。Kafka サーバーは、このメッセージを Virtual DataPort に転送します。Virtual DataPort は応答を返しますが、選択した出力が XML の場合は、「 XML response message sent by a Kafka listener 」、JSON の場合は、「 JSON response message sent by a Kafka listener 」、あるいは「 Binary response message sent by a Kafka listener 」のようになります。

Kafka リスナーが送信する XML 応答メッセージ
<?xml version="1.0" encoding="UTF-8"?>
<response>
    <item>
        <iinc_id>1.00</iinc_id>
        <summary>Error in ADSL router</summary>
        <ttime>29-jun-2005 19h 19m 41s</ttime>
        <taxid>B78596011</taxid>
        <specific_field1>1</specific_field1>
        <specific_field2>1</specific_field2>
    </item>
</response>
Kafka リスナーが送信する JSON 応答メッセージ
[
    {
        "IINC_ID": 1,
        "SUMMARY": "Error in ADSL router",
        "TTIME": "2005-06-29",
        "TAXID": "B78596011",
        "SPECIFIC_FIELD1": "1",
        "SPECIFIC_FIELD2": "1"
    }
]

リクエストが ALTER VIEW incidents CACHE INVALIDATE などの DML センテンスの場合、応答は空になります (「 XML response message to a DML query 」および「 JSON response message to a DML query 」を参照)。

DML クエリへの XML 応答メッセージ
<?xml version="1.0" encoding="UTF-8"?>
<response />
DML クエリへの JSON 応答メッセージ
[]

オプション (b) の動作例: 次のクエリを使用していて Kafka リスナーを作成したとします。

Kafka リスナーで定義したサンプルクエリ
SELECT * FROM incidents
WHERE taxid = '@KafkaEXPRESSION'

リスナーが値 B78596014 を受信すると、サーバーは次のクエリを実行します。

SELECT * FROM incidents
WHERE taxid = 'B78596014'

Kafka リスナーで定義したクエリに @LISTENEREXPRESSION 変数が含まれていない場合、リスナーから受信した値は無視され、クエリは「そのまま」実行されます。

次の図には、Kafka リスナーを扱うさまざまなコマンドの構文が含まれています。

CREATE LISTENER KAFKA ステートメントの構文
CREATE [ OR REPLACE ] LISTENER KAFKA <name:identifier> [ID = <id:literal> ]
    GROUPID = <name:literal>
    INPUTTOPIC = <name:literal>
    [ REPLYTOPIC = <name:literal> ]
    [ IGNOREREPLYTOPIC ]
    [ INPUT = { PLAIN | BINARY } ]
    [ OUTPUT = { JSON | XML | BINARY } ]
    [ VDPDATABASE = <name:literal> ]
    VDPUSER = <name:literal>
    [ MESSAGESCONTAINVARIABLE = { TRUE | FALSE } QUERY = <query:literal> ]
    ENABLED = { TRUE | FALSE }
    AUTHENTICATION = { NONE | SASLPLAINTEXT | SASLGSSAPI }
    [ SSL = { TRUE | FALSE } ]
    KAFKASERVER = <name:literal>
    [ CHUNKSIZE = <chunksize:literal> ]
    [ DLQ = <dql:literal> ]
    PROPERTIES ( <property> [, <property> ]* )
    [ FOLDER = <folder:literal> ]
    [ DESCRIPTION = <description:literal> ]

<property> ::=
    <key:literal> = <value:literal>
  • GROUPID は、複数のリスナーが同じコンシューマーに属する場合に負荷分散されるように、グループ ID を確立するために使用します。

    Kafka サーバーのベンダーによっては、サブスクライブ先を作成しなければならない場合があります。そうでない場合は、リスナーがサブスクライブしようとしたときに、サブスクライブ先が自動的に作成されます。

  • INPUTTOPIC は、Kafka リスナーがメッセージを受信する受信元を設定する場合に使用します。

  • REPLYTOPIC は、応答が返される Kafka キューまたはトピックの名前です。IGNOREREPLYTOPIC が指定されていない場合は必須です。

    IGNOREREPLYTOPIC 句が指定されている場合、リスナーは応答を決して返しません。

  • INPUT: このパラメータが PLAIN の場合は JSONXML またはプレーンの値を受け取り、 BINARY の場合は byte[] を受け取ります。

  • OUTPUT: このパラメータが XML の場合、リスナーの出力は XML ドキュメントになります。JSON の場合は JSON ドキュメント、 BINARY の場合は BINARY ドキュメントになります。BINARY の応答は 1 行 1 列に制限されています。IGNOREREPLYTOPIC が指定されていない場合に必須のパラメータです。

  • VDPUSER は、リスナーがクエリを実行するための十分な権限を持っているかどうかを確認するために、Virtual DataPort が使用するユーザー名です。Virtual DataPort サーバーにユーザーが 2 名登録されている例を以下に示します。

    • admin: 「管理者」、またはデータベースの任意のビューにアクセスすることのできる、データベースの「ローカル管理者」に昇格したユーザー。

    • user1: このデータベースに対する EXECUTE 権限だけを持つ「標準ユーザー」。

    パラメータ VDPUSER が「admin」の場合、リスナーは任意のクエリを実行できます。ただし、 VDPUSERuser1 の場合、 user1 には EXECUTE 権限しかないため、 CREATEDROPINSERTUPDATEDELETE クエリは失敗します。

  • MESSAGESCONTAINVARIABLE および QUERY: これらが指定され、 MESSAGESCONTAINVARIABLEtrue の場合、リスナーが Kafka メッセージを受信すると、サーバーは QUERY の変数 @LISTENEREXPRESSION を Kafka メッセージの内容に置き換えます。その後、サーバーは、置換後のクエリを実行します。

  • ENABLED: TRUE の場合、リスナーが有効になります。つまり、リスナーの作成後、そのリスナーはリクエストを待機するために Kafka サーバーへのコネクションを試みます。FALSE の場合、リスナーは無効になります。

  • AUTHENTICATION: 3 種類の認証方法を指定できます。

    • NONE: リスナーは、 PLAINTEXT プロトコルを使用して Kafka サーバーに接続します。

    • SASLPLAINTEXT: リスナーは SASLPLAINTEXT プロトコルを使用して接続します。

      この場合、ユーザー名とパスワードを指定する必要があります。

    • SASLGSSAPI: リスナーは Kerberos プロトコルを使用して接続します。プリンシパル、このサーバーに接続するための資格情報を含む keytab ファイルの場所 (Virtual DataPort が動作するコンピュータにこのファイルをアップロードする必要があります)、およびサービス名を指定する必要があります。

  • KAFKASERVER: Kafka サーバーの IP とポート。

  • PROPERTIES: Kafka サーバーへのコネクションを取得するために使用されるプロパティのリスト。プロパティは key = value の形式に従います。key はプロパティの名前です。常に値をプロパティに関連付ける必要があります。

  • FOLDER: リスナーが格納されるフォルダのパス。例: FOLDER = '/Kafka listeners'

  • DESCRIPTION: リスナーの説明。

Kafka リスナーを有効または無効にするコマンド: ALTER LISTENER Kafka
ALTER LISTENER Kafka <name:identifier>
ENABLED EQ { true \| false }

ENABLED = TRUE で、以前は FALSE だった場合は、リクエストを待機するために Kafka サーバーへのコネクションを試みます。

ENABLED = FALSE で、以前は TRUE だった場合は、Kafka サーバーとのコネクションを閉じます。

Kafka リスナーの名前を変更するには、次のような RENAME ステートメントを使用します。

ステートメント RENAME LISTENER Kafka の構文
RENAME LISTENER KAFKA <name:identifier> TO <new name:identifier>
RENAME LISTENER KAFKA Kafka_listener_order TO Kafka_order_full;
Add feedback