Overview
Denodo serves as a powerful integration layer for enabling real-time data synchronization across diverse systems. By combining Debezium for Change Data Capture (CDC) and Denodo organizations can capture source changes and automatically propagate them to materialized tables through Denodo. This approach enables users to manage and distribute real-time data across heterogeneous systems, which ensures consistency, reducing integration complexity and delivering up-to-date insights without relying on complex ETL pipelines.
Architecture
This solution integrates Debezium, an open-source Change Data Capture (CDC) tool, with Denodo’s Kafka Listeners to enable real-time data synchronization.
- Debezium continuously monitors source databases and streams row-level changes to Kafka topics.
- Denodo subscribes to these topics via its Kafka Listener and automatically applies the same changes to defined target systems.
This integration creates a seamless, real-time data pipeline that keeps Denodo’s virtualized data layer synchronized with live updates from source systems.
Key Components
- Debezium is an open-source CDC (Change Data Capture) tool that captures data modifications in source systems and streams those changes in real time through Kafka. The supported source systems include:
- MongoDB
- MySQL
- PostgreSQL
- SQL Server
- Oracle
- Db2
- Cassandra
- Vitess (Incubating)
- Spanner
- Informix (Incubating)
These connectors allow Debezium to monitor database logs and publish change events to Kafka topics, making them consumable by downstream applications or platforms such as Denodo.
- Apache Kafka acts as the central event streaming platform in this architecture. It receives the CDC events from Debezium connectors and stores them in Kafka topics. These topics can then be consumed by Denodo for processing, transformation, or synchronization.
- Denodo Kafka Listener enables Denodo to consume real-time messages from Kafka topics and apply the needed actions in the Denodo Platform, e.g. updating the remote tables according to the received messages.
- Target Table is where real-time change data is applied within the Denodo platform. It can reside in any supported data source — whether a traditional relational database (e.g., MySQL, PostgreSQL, Oracle…) or a file system or object storage using open table formats. CDC events captured by Debezium and streamed through Kafka can be directly applied to these targets to maintain real-time synchronization.
For scenarios involving large data volumes that need to be materialized in object storage, an extended pattern can be adopted. In this approach, Denodo’s MPP engine periodically offloads accumulated changes to the target files, while recent CDC events are temporarily stored in a buffer table within a transactional database. By creating a union view in Denodo that combines both the static data in object storage and the temporary changes in the buffer, users can query up-to-date results in real time. This ensures low-latency access to the latest data while efficiently managing large-scale updates.
Prerequisites
This section outlines the prerequisites for setting up a real-time Change Data Capture (CDC) pipeline. It focuses on configuring the core components including Kafka, Kafka Connect, and Debezium, which are required to capture and stream data changes from a source database (e.g., PostgreSQL). Once these components are in place and properly registered, the CDC events can later be integrated with the Denodo Kafka Listener to automatically apply changes to target systems.
Step 1: Enable Logical Replication in Source
This step prepares the source system (e.g., PostgreSQL) for Debezium to capture data changes by enabling logical replication. Logical replication ensures that all data modifications are logged and can be streamed to Kafka. Below are example steps for PostgreSQL as source system:
- Modify postgresql.conf (Normally Location on Windows is: C:\ProgramFiles\PostgreSQL\<version>\data\postgresql.conf):
wal_level = logical max_replication_slots = 4 max_wal_senders = 4 |
These settings are essential for enabling logical replication, which is needed for Debezium to capture database changes. This number is chosen as a balanced default. It is more than the absolute minimum required (which is 1), but not so high that it wastes system resources. With 4 replication slots and 4 WAL senders, PostgreSQL can support:
- One Debezium connector for CDC.
- One replication slot for schema history (if applicable).
- One extra slot for future tables, testing, or failover scenarios.
- One spare slot to ensure replication does not break when scaling or reconfiguring. This provides flexibility and reliability without requiring a restart later if more components need to connect for logical replication.
- Restart the postgresql service, to make sure the properties have been saved correctly run show wal_level from a SQL shell to see if the value has been updated.
- Create a replication user in Postgresql ( the user will be used for Debezium to capture the last register config file):
CREATE ROLE 'your_role' WITH LOGIN REPLICATION PASSWORD 'your_password'; GRANT SELECT ON ALL TABLES IN SCHEMA public TO 'your_role'; ALTER TABLE 'your_table' REPLICA IDENTITY FULL; // To be able to capture full row data. |
Step 2: Kafka Setup
If Kafka is already set up in your environment, simply start your Kafka services. If not, you can follow the example steps below to quickly set up Kafka.
- Download Kafka.
- Configure Kafka for KRaft Mode (if your Kafka version is lower than 3.0 you need to run Zookeeper.)
- Generate a Cluster UUID: bin/kafka-storage.sh random-uuid
- Format the Storage with KRaft: bin/kafka-storage.sh format -t <uuid> -c config/kraft/server.properties
- Set log path for avoiding requiring uuid generating when restart: log.dirs=<your_kafka_path>/kafka/kafka-logs
- Start Kafka Server without Zookeeper but with KRaft Mode:
bin/kafka-server-start.sh config/kraft/server.properties
Step 3: Kafka Connect with Debezium SetUp
Kafka Connect is a framework for integrating Kafka with external systems, while the Debezium connector runs on top of it to capture and stream database changes into Kafka topics. The following steps show how to set up Kafka Connect with the Debezium PostgreSQL connector.
- Download Debezium PostgreSQL Connector then extract the archive.
- Move the plugin to Kafka’s plugin path (debezium-connector-postgresql to </your_path/to/kafka/plugins>).
- Configure Kafka Connect (config/connect-distributed.properties)
- If you prefer processing the kafka message in Struct format, the config file should be setup as below:
bootstrap.servers=kafka_host:kafka_port (Normally is localhost:9092) plugin.path=/path/to/kafka/plugins rest.port=your_port #(Normally is 8083) group.id=kafka-connect-group value.converter=org.apache.kafka.connect.storage.StringConverter key.converter.schemas.enable=false value.converter.schemas.enable=false |
This will ensure that the value of the topic message will be returned as Struct which can be processed by Denodo custom stored procedure denodo_cdc_debezium_struct that we will explain later in this document.
- If you prefer processing the kafka message in Json Schema format, the config file should be setup as below:
bootstrap.servers=kafka_host:kafka_port (Normally is localhost:9092) plugin.path=/path/to/kafka/plugins rest.port=your_port #(Normally is 8083) group.id=kafka-connect-group value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true |
This will ensure that the value of the topic message will be returned as Struct which can be processed by Denodo custom stored procedure denodo_cdc_debezium_json that we will explain later in this document.
- Start Kafka Connect from your Kafka installation directory:
bin/connect-distributed.sh config/connect-distributed.properties
Step 4: Register the Debezium Kafka Connector
After setting up Kafka Connect, you need to register the Debezium connector so it can start capturing changes from your source database and publishing them to Kafka topics. The example below shows how to configure and register a PostgreSQL connector.
- Create a JSON configuration file (register-postgres.json):
{ "name": "postgres-debezium-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "your_postgresql_server", "database.port": "your_postgresql_port", "database.user": "'user_you_create'", "database.password": "your_password", "database.dbname": "your_database_to_listen", "database.server.name": "local-postgres", "plugin.name": "pgoutput", "slot.name": "debezium_slot", "publication.name": "dbz_publication", "table.include.list": "public.your_table", "topic.prefix": "postgres-db", "transforms": "route", "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.route.regex": ".*", "transforms.route.replacement": "postgres-db.public.your_topic_name" } } |
- Register the connector using: curl -X POST -H "Content-Type: application/json" --data @/path-to-your-file/register-postgres.json http://localhost:8083/connectors
Denodo Setup
Once Debezium starts publishing source table changes into Kafka topics, the next step is to configure Denodo to listen to these topics and apply the real-time updates to the target tables.
- Create the Target Table in Denodo
- The target table is where CDC (Change Data Capture) events are ultimately applied within the Denodo Platform. It typically mirrors the schema of the source table, containing the same columns and data structure. Denodo’s Kafka Listener consumes change events from Kafka and applies corresponding insert or delete operations to keep this target synchronized with the source in real time.
- For large-scale data or object storage scenarios, an extended pattern can be adopted to balance performance and scalability. In this setup:
- The target table (e.g., in object storage) holds the persisted, periodically offloaded data.
- A buffer table (a relational remote table) stores real-time CDC events that occur between two offload intervals. Unlike the main target table, the buffer table contains all columns from the source table plus two additional columns:
- operation — to indicate the type of change (e.g., insert, delete)
- insert_time — to record the timestamp when the change event was received
During query execution or offload operations, users are able to create a virtual cleanup view on top of this buffer table to ensure only the latest record for each primary key is retained, avoiding redundant updates before merging with or offloading to the final target. Below is an example of the cleaning up process.
Below is an example VQL on Denodo for creating such a virtual cleanup view on top of the buffer table.
CREATE OR REPLACE VIEW buffer_table_clean FOLDER = '/your_folder' AS WITH min_rows AS ( SELECT * FROM ( SELECT buffer_table. *, row_number() OVER(PARTITION BY name ORDER BY insert_time ASC) AS rn FROM buffer_table ) WHERE rn = 1 ), max_rows AS ( SELECT * FROM ( SELECT buffer_table. *, row_number() OVER(PARTITION BY name ORDER BY insert_time DESC) AS rn FROM buffer_table ) WHERE rn = 1 ) SELECT * FROM min_rows WHERE min_rows.insert_time = (SELECT insert_time FROM max_rows WHERE max_rows.name = min_rows.name) SQL UNION ALL SELECT * FROM min_rows WHERE min_rows.name IN ( SELECT m.name FROM min_rows AS m INNER JOIN max_rows AS x ON m.name = x.name WHERE (m.operation = 'delete' AND x.operation = 'delete' AND m.insert_time <> x.insert_time) ) SQL UNION ALL SELECT * FROM max_rows WHERE max_rows.name IN ( SELECT m.name FROM min_rows AS m INNER JOIN max_rows AS x ON m.name = x.name WHERE (m.operation = 'insert' AND x.operation = 'insert' AND m.insert_time <> x.insert_time) ) SQL UNION ALL SELECT * FROM min_rows WHERE min_rows.name IN ( SELECT m.name FROM min_rows AS m INNER JOIN max_rows AS x ON m.name = x.name WHERE (m.operation = 'delete' AND x.operation = 'insert' AND m.insert_time <> x.insert_time) ) SQL UNION ALL SELECT * FROM max_rows WHERE max_rows.name IN ( SELECT m.name FROM min_rows AS m INNER JOIN max_rows AS x ON m.name = x.name WHERE (m.operation = 'delete' AND x.operation = 'insert' AND m.insert_time <> x.insert_time) ); |
Once the cleanup view has been created, you can define a virtual union view on Denodo that combines the buffer table with the target object storage table. This union view enables users to query both the latest real-time changes and the persisted data as a single, up-to-date dataset. Below is an example of the union process:
Below is the example VQL on Denodo for creating the union view:
CREATE OR REPLACE VIEW final_union FOLDER = '/your_folder' AS WITH deleted_keys AS (SELECT DISTINCT name FROM buffer_table_clean WHERE operation = 'delete') SELECT test.name, test.column_1, ... test.column_n FROM target_table AS test LEFT OUTER JOIN deleted_keys AS d ON test.name = d.name WHERE d.name IS NULL SQL UNION ALL SELECT name, column_1, ..., column_n FROM buffer_table_clean WHERE operation = 'insert' |
- Follow Importing Java Stored Procedures to import denodo_cdc_debezium.jar. Depending on your Kafka message format set the class name as: com.denodo.vdp.demo.storedprocedure.DenodoCdcDebeziumStruct or com.denodo.vdp.demo.storedprocedure.DenodoCdcDebeziumJson.
If you are not sure which one to use, you can review your config/connect-distributed.properties file which has been described in above section “Kafka Connect”.
This procedure automatically captures real-time data changes and transforms each message into executable VQL, which is then applied to the target table. A detailed explanation of the logic will be provided in the section “Walk Through the Code” in this document,
- Create a Kafka Listener to subscribe to the Debezium topic.
On query session:
- Static Execution (Single Target Table in a Known Database)
All changes are intended to be applied to a single, known table within a specific database, and this target remains the same for all messages, you can use the procedure in direct execution mode:
- If changes directly to apply into target table:
call denodo_cdc_debezium_struct('@LISTENEREXPRESSION', false, ‘target_db.target_table’)
or
call denodo_cdc_debezium_json('@LISTENEREXPRESSION', false, ‘target_db.target_table’)
- If changes to store into buffer table:
call denodo_cdc_debezium_struct('@LISTENEREXPRESSION', true, ‘buffer_db.buffer_table’)
or
call denodo_cdc_debezium_json('@LISTENEREXPRESSION', true, ‘buffer_db.buffer_table’)
- Dynamic Execution Using a Mapping View
For more complex scenarios involving multiple source tables or varying destinations, you can configure Denodo to dynamically determine the target table based on message content. To do this:
- If changes directly to apply into target table
denodo_cdc_debezium_struct('@LISTENEREXPRESSION', false)
or
denodo_cdc_debezium_json('@LISTENEREXPRESSION', false)
- If changes to store into buffer table denodo_cdc_debezium_struct('@LISTENEREXPRESSION', true)
or
denodo_cdc_debezium_json('@LISTENEREXPRESSION', true)
Note: The column names in the Excel example file must remain unchanged for the procedure to resolve the correct target table.
- Progress Completed.
Once the above setup is complete, you can seamlessly integrate Debezium with the Denodo Kafka Listener to stream CDC (Change Data Capture) changes directly into your target Denodo tables.
- Without buffer table pattern involved:
You are able to query the realtime data on your remote target table. - With buffer table pattern involved:
You are able to query real time data on the union view based on your buffer table and your final file system, once you have offloaded the changes from the cleaned up view of your buffer table, you can delete all records in your buffer table then wait for new changes coming in.
- Execution Monitoring:
Successful Executions:
You can verify the executed VQL statements in the “Reply Topic” specified in your Denodo Kafka Listener configuration.
Failed Executions:
Any errors encountered during execution will be logged in the “Topic to Send Error Messages” that you also configured.
Walk Through the Code
This section describes the custom Java stored procedure developed to transform Debezium messages for execution within the Denodo Platform. It explains the procedure’s purpose, core functionality, source code structure, and compilation steps. If future requirements arise, this section can be revisited to modify the procedure’s logic and recompile the JAR file accordingly.
Procedure’s Workflow
Source Code
Here is the Java source code for DenodoCdcDebeziumStruct and Java source code for DenodoCdcDebeziumJson used to generate the provided JAR file. If additional functionality or adjustments are required, this source code can be updated and recompiled to meet future requirements.
Compilation Steps
If any modifications are made to the Java source file, it must be recompiled then re-imported into the Denodo Platform.
Steps to compile the stored procedure:
- Place the Java file under <DENODO_HOME>\samples\vdp\storedProcedures\src\com\denodo\vdp\demo\storedprocedure
- Navigate to <DENODO_HOME>\samples\vdp\storedProcedures\scripts. Modify the file compile_storeprocedures.bat file as following:
@echo off REM --------------------------------------------------------------------------- REM Environment variable JAVA_HOME must be set and exported REM --------------------------------------------------------------------------- SET DENODO_HOME=<DENODO_HOME> SET SAMPLES_HOME=%DENODO_HOME%/samples/vdp/storedProcedures SET DENODOVDPSERVER_JAR=%DENODO_HOME%/lib/vdp-server-core/denodo-vdp-server.jar SET COMMONSLOGGING_JAR=%DENODO_HOME%/lib/contrib/commons-logging.jar SET DENODOUTIL_JAR=%DENODO_HOME%/lib/contrib/denodo-commons-util.jar SET DENODOVDBPARSER_JAR=%DENODO_HOME%/lib/vdp-client-core/denodo-vdp-parser.jar SET JTA_JAR=%DENODO_HOME%/lib/contrib/ow2-jta-1.1-spec.jar SET SRC_DIR=%DENODO_HOME%/samples/vdp/storedProcedures/src SET JAVAC_BIN=%JAVA_HOME%\bin\javac.exe if NOT exist "%JAVAC_BIN%" SET JAVAC_BIN=javac.exe SET JAR_BIN=%JAVA_HOME%\bin\jar.exe if NOT exist "%JAR_BIN%" SET JAR_BIN=jar.exe "%JAVAC_BIN%" -d "%SAMPLES_HOME%/target/classes" -classpath "%DENODOUTIL_JAR%;%COMMONSLOGGING_JAR%;%DENODOVDPSERVER_JAR%;%DENODOVDBPARSER_JAR%;%JTA_JAR%" "%SRC_DIR%/com/denodo/vdp/demo/storedprocedure/your_new_procedure.java" if NOT exist "%SAMPLES_HOME%/target/jars" ( mkdir "%SAMPLES_HOME%/target/jars" ) "%JAR_BIN%" cf "%SAMPLES_HOME%/target/jars/your_new_procedure.jar" -C "%SAMPLES_HOME%/target/classes" . |
- Open terminal under this folder then run the .bat file,
- Now you should be able to have the compiled .jar file under <DENODO_HOME>\samples\vdp\storedProcedures\target.
- Import the generated JAR into Denodo.
Conclusion
Together, Debezium, Kafka, the Denodo Kafka Listener and the Target Table create a cohesive and scalable real-time data synchronization architecture. Debezium captures change events directly from source systems, Kafka reliably transports these events, and Denodo’s Kafka Listener applies the updates to the designated target tables or views. Whether the target resides in a database or in an object storage–based table format, this coordinated workflow ensures that Denodo always reflects the latest state of the underlying systems. The result is a modern, efficient, and low-latency integration pattern that minimizes ETL complexity while delivering continuously up-to-date data to downstream consumers.
The information provided in the Denodo Knowledge Base is intended to assist our users in advanced uses of Denodo. Please note that the results from the application of processes and configurations detailed in these documents may vary depending on your specific environment. Use them at your own discretion.
For an official guide of supported features, please refer to the User Manuals. For questions on critical systems or complex environments we recommend you to contact your Denodo Customer Success Manager.

