Kafka リスナーの作成¶
Kafka リスナーを作成するには、以下の手順に従って実行してください。
[File] > [New] > [Listeners] メニューで [Kafka Listener] をクリックするか、[Server Explorer] で右クリックして [New] > [Listeners] メニューで [Kafka Listener] をクリックします。
リスナー名を設定します。この名前を同じデータベース内で繰り返すことはできません。必須フィールドです。
[Kafka server] で、Kafka サーバーの IP とポートを指定します。必須フィールドです。
グループ ID を設定します。このフィールドでは、複数のリスナーが同じコンシューマーに属している場合、負荷分散を実行できます。必須フィールドです。
リスナーを有効にするには、[On] をクリックします。[Off] をクリックすると、リスナーは作成されますが、無効になります。
[VDP user name] を選択します。Virtual DataPort では、この名前を使用して、リスナーが受信したクエリを実行します。選択するユーザーは、ローカルユーザーでなければなりません。Virtual DataPort で作成されたユーザーにします。
たとえば、以下の 2 つのユーザーが存在するとします。
admin: データベースの「管理者」であるユーザーまたはデータベースの「ローカル管理者」に昇格したユーザー。データベースの任意のビューにアクセスできます。
user1: このデータベースに対する
EXECUTE権限のみを持つ「標準ユーザー」。
ユーザー「admin」を選択すると、リスナーはどのクエリも実行できるようになります。ただし、「user1」を選択した場合、「user1」には
EXECUTE権限しかないため、CREATE、DROP、INSERT、UPDATE、DELETEクエリは失敗します。パラメータ化されたクエリを使用する場合、[Kafka messages contain the value of…] チェックボックスをチェックして、その下のボックスにクエリを入力します。
すでに説明したように、このオプションを使用すると、文字列
@LISTENEREXPRESSIONは実行時に Kafka サーバーから受信した値で置き換えられます。[Input]、[Plain] または [Binary] を選択します。[Plain] は JSON、XML、Plain の値を受け入れます。[Binary] は入力トピックから byte[] を受け取り、値は VDP で使用するため BASE64 にエンコードされます。カスタム関数を使用してデコードし、blob 型に変換して blob フィールドと比較することができます。必須フィールドです。
Kafka リスナーがメッセージを受信する [Input topic] を設定します。必須フィールドです。
Kafka リスナーが VDP に対して実行したクエリの応答を別のトピックに送信しない場合は、[Ignore reply to] を選択します。
[Output]、[XML]、[JSON]、または [Binary] を選択すると、応答は特定の形式で Kafka サーバーに返されます。[Binary] には 1 行 1 列の応答制限があり、フィールドは blob 型でなければなりません。[Binary] 出力を使用すると、バイト配列が [Reply topic] に送信されます。[ignore reply to] を選択しない場合は必須です。
VDP クエリの応答を送信するトピックを指定するには、[Reply topic] を設定します。[ignore reply to] を選択しない場合は必須です。
VDP の応答に複数の行がある場合に 1 メッセージあたり N 行を送信するには、[Split response in several messages] を選択します。
各メッセージに含める行数を指定するには、[Max.number of rows in each message] を設定します。[Split the response in several messages] を選択する場合は必須です。
[Authentication] ドロップダウンで、Kafka サーバーに接続するための認証方法を指定します。
No authentication: リスナーは、 PLAINTEXT プロトコルを使用して Kafka サーバーに接続します。
SASL/PLAINTEXT: リスナーは、 SASL_PLAINTEXT プロトコルを使用して Kafka サーバーに接続します。SSL が無効の場合、使用されるプロトコルは SASL_PLAINTEXT です。この場合、ユーザー名とパスワードを入力する必要があります。
SASL/GSSAPI: リスナーは、 Kerberos プロトコルを使用して Kafka サーバーに接続します。プリンシパルと keytab ファイルを入力します。[SSL enabled] がオフの場合、使用されるセキュリティプロトコルは SASL_PLAINTEXT です。
SASL/SCRAM-SHA-512: リスナーは、 SASL_SCRAM_SHA_512 プロトコルを使用して Kafka サーバーに接続します。
[Kafka custom properties] チェックボックスで定義されたデフォルトのプロパティに値を指定する必要があります。このセクションでは、この認証方法を使用してリスナーに接続できるようにするために渡す必要があるカスタムプロパティを定義します。さらに、sasl.mechanism、security.protocol、sasl.jaas.config などのプロパティや、ssl のチェックボックスがチェックされている場合は ssl の構成に関連するプロパティ (ssl.truststore.password および ssl.truststore.location) も定義する必要があります。SSL のチェックボックスをクリアした場合、使用されるセキュリティプロトコルは SASL_PLAINTEXT です。この認証タイプを使用する場合の、カスタムプロパティが定義された Kafka リスナーの例:
認証タイプ SASL_SCRAM_SHA_512 を使用して定義された Kafka リスナー¶CREATE OR REPLACE LISTENER KAFKA kafka_listener_SHA_512 GROUPID = 'asdf' INPUTTOPIC = 'Test' REPLYTOPIC = 'test' INPUT = PLAIN OUTPUT = XML VDPUSER = 'admin' MESSAGESCONTAINVARIABLE = FALSE QUERY = '' ENABLED = FALSE AUTHENTICATION = NONE SSL = FALSE KAFKASERVER = 'localhost:9092' CUSTOM_PROPERTIES_ENABLED = TRUE # THIS NEW STATMENT PROPERTIES ( 'truststore_password_ssl' = 'password' ENCRYPTED, 'truststore_ssl' = '/path/ca.jks', 'sasl.mechanism'='SCRAM-SHA-512', 'bootstrap.servers'='server:9093', 'key.deserializer'= 'org.apache.kafka.common.serialization.StringDeserializer', 'key.serializer'= 'org.apache.kafka.common.serialization.StringSerializer', 'enable.auto.commit'='true', 'group.id'='group id', 'auto.offset.reset'='latest', 'value.deserializer'='org.apache.kafka.common.serialization.StringDeserializer', 'value.serializer'='org.apache.kafka.common.serialization.StringSerializer', 'security.protocol'='SASL_SSL', 'sasl.jaas.config'='org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="plain text password";' )
SSL で Kafka に接続するには、[SSL enabled] を選択します。SSL が SASL_PLAINTEXT または SASL_GSSAPI 認証方法で有効になっている場合、使用されるセキュリティプロトコルは SASL_SSL であり、[NONE] 認証方法が設定されている場合、使用されるセキュリティプロトコルは SSL です。
選択する場合、以下を指定します。
Truststore file location: トラストストアファイルの場所。
Truststore password: トラストストアのパスワード。
正常に処理されないメッセージを保存し、[Reply topic] に送信しない場合は、[Send error messages to Dead Letter Queue] を選択します。
Kafka のデッドレターキューとは 1 つ以上の Kafka トピックであり、エラーが原因で他のストリーミングパイプラインで処理できなかったメッセージを受信して保存します。このオプションを選択すると、エラーメッセージの送信先のトピックを指定できます。
[Metadata] タブでは、Kafka リスナーの格納先フォルダと説明を設定します。
Kafka リスナーの作成¶
リスナーを作成したら、[Kafka Listener Status] ダイアログ ([Tools] > [Listeners] > [Kafka Listeners] メニュー) を開いて、既存の Kafka リスナーのリストを表示できます。
既存の Kafka リスナーのリスト¶
Kafka リスナーの名前の横にあるチェックボックスをチェックして、[Enable selected] または [Disable selected] をクリックすることで、そのリスナーを有効または無効にできます。
リスナーを削除するには、[Server Explorer] でリスナーを右クリックして [Drop] をクリックします。
プロセスは Design Studio を使用する場合も同様です。
リスナーを作成すると、[Kafka Listener Status] ダイアログで Kafka リスナーのリストを確認できます。
