Objective
In this article, we are going to describe step by step how we can build a workflow to allow automation of datasets such as JSON, CSV, XML and Excel as part of the Denodo Catalog. This enables Denodo Business power users to consume and combine files in an easy way through the Denodo Platform.
For an existing folder that contains different file types, this feature creates Base Views automatically.
Implementing a “Bring Your Own Data” workflow
Pre-configuration steps
In order to implement this workflow is necessary to:
- Download and Install the FileSystem Custom Wrapper
- Download and Install the VQL Generation Stored Procedures
Note: The links contain highlighted parts that should be followed in case the Stored Procedure and the Custom Wrapper are not already installed.
Implementation
In this example, files are placed in a Local Path, meaning, a folder accessible to the VDP Server. This can be for example a Network Drive accessible to users and VDP Server/s. The figure below displays the location of the files and the files that are available. The full path of the files is: ‘B:\Bring Your Own Data'.
Figure 1.1 Bring Your Own Data folder
To implement this workflow we will use 2 virtual databases. The first one, ‘byod_setup’, contains the data sources/ base views/ stored procedures/ etc. that are instrumental to generate the mapping data sources and views in sync with the files. The second one, ‘byod’, will contain the automatically generated data sources and base views mapping the files from the file resource location. In this example: ‘B:\Bring Your Own Data’.
To create the two databases the following steps should be taken:
- Open the Denodo Design Studio
- Go to Administration > Database Management
- Click on New
- Enter the name ‘byod_setup’ and a description.
- Click on Ok button.
- Redo steps 3 - 5 for the database named ‘byod’.
For a better overview at the end of this implementation, it is recommended to organize your newly created data sources/ base views/ stored procedures/ etc. in separate Denodo folders in the Virtual DataPort Administration Tool. This article follows the VDP Naming Convention.
The image below describes the folder structure for the ‘byod_setup’ and the ‘byod’ virtual databases.
Figure 1.2 Describes the databases structure
The VQL code below will create the folders described for the ‘byod_setup’ database.
CREATE OR REPLACE FOLDER '/01 - Connectivity' DESCRIPTION 'This folder stores the data sources and base views of the project' ; CREATE OR REPLACE FOLDER '/01 - Connectivity/01 - Data Sources' DESCRIPTION 'This folder stores the data sources of the project' ; CREATE OR REPLACE FOLDER '/01 - Connectivity/02 - Base Views' DESCRIPTION 'This folder stores the base views of the project' ; CREATE OR REPLACE FOLDER '/02 - Integration' DESCRIPTION 'The integration views created for combining the different base views are stored in this folder' ; CREATE OR REPLACE FOLDER '/02 - Integration/01 - Files' DESCRIPTION 'This folder stores the view listing all files of the local folder' ; CREATE OR REPLACE FOLDER '/02 - Integration/02 - Files Extraction' DESCRIPTION 'This folder stores the view listing the files that should be created' ; CREATE OR REPLACE FOLDER '/02 - Integration/03 - Files Deletion' DESCRIPTION 'This folder stores the view listing the files that should be deleted' ; CREATE OR REPLACE FOLDER '/02 - Integration/04 - VQL Generator' DESCRIPTION 'This folder stores the view returning the VQL for creating data sources and base views of the files' ; CREATE OR REPLACE FOLDER '/03 - Stored Procedures' DESCRIPTION 'This folder stores Stored procedures' ; |
The VQL code below will create the folders described for the ‘byod’ database.
CREATE OR REPLACE FOLDER '/02 - Integration' DESCRIPTION 'The integration views created for combining the different base views are stored in this folder' ; CREATE OR REPLACE FOLDER '/02 - Integration/01 - Final View' DESCRIPTION 'This folder stores the view listing all files of the local folder' ; CREATE OR REPLACE FOLDER '/02 - Integration/01 - Final View/01 - Data Source' DESCRIPTION 'This folder stores the data sources of the database' ; CREATE OR REPLACE FOLDER '/02 - Integration/01 - Final View/02 - Base View' DESCRIPTION 'This folder stores the base views of the database' ; CREATE OR REPLACE FOLDER '/02 - Integration/01 - Final View/01 - Data Source/Delimited Files' DESCRIPTION 'This folder stores the data sources of Delimited Files' ; CREATE OR REPLACE FOLDER '/02 - Integration/01 - Final View/01 - Data Source/Excel' DESCRIPTION 'This folder stores the data sources of Excel files' ; CREATE OR REPLACE FOLDER '/02 - Integration/01 - Final View/01 - Data Source/JSON' DESCRIPTION 'This folder stores the data sources of JSON files' ; CREATE OR REPLACE FOLDER '/02 - Integration/01 - Final View/01 - Data Source/XML' DESCRIPTION 'This folder stores the data sources of XML files' ; CREATE OR REPLACE FOLDER '/02 - Integration/01 - Final View/02 - Base View/Delimited Files' DESCRIPTION 'This folder stores the base views of Delimited Files' ; CREATE OR REPLACE FOLDER '/02 - Integration/01 - Final View/02 - Base View/Excel' DESCRIPTION 'This folder stores the base views of Excel files' ; CREATE OR REPLACE FOLDER '/02 - Integration/01 - Final View/02 - Base View/JSON' DESCRIPTION 'This folder stores the base views of JSON files' ; CREATE OR REPLACE FOLDER '/02 - Integration/01 - Final View/02 - Base View/XML' DESCRIPTION 'This folder stores the base views of XML files' ; |
The next steps present the configuration of the data sources, views and stored procedures needed to set up the ‘byod_setup’ database.
Step 1: Create a data source to extract a list of files
- From the Web Design Studio or the Virtual DataPort Administration Tool.
- Click the + sign beside the ‘byod_setup’ database
- Click on the ellipsis icon or right-click on /01 - connectivity/01 - data sources > New > Data source > Custom.
- A new window (“new_custom_datasource”) will be open, in which:
- Specify the name of the Custom Data Source (e.g ds_localfolder).
- Check the box “Select jars”.
- Choose the existing extension denodo-filesystem-customwrapper.
- As “Class Name” select com.denodo.connect.filesystem.ReadFileSystemConnector.
- Click Save.
Figure 1.3 Shows how to open a “new_custom_datasource”
Figure 1.4 Creates the custom data source.
Step 2: Create the Base View to retrieve a list of all files
- Open the just-created data source ds_localfolder.
- Click on Create base view.
- Edit the name ( e.g bv_localfolder).
- Click on Metadata and choose /01 - connectivity/02 - base views as the folder location.
- Click Save.
To check that everything works, you can execute the base view that we just created. Go to the Execution panel, enter parentfolder = ‘B:\Bring Your Own Data’ and recursive = false as where condition and click on Execute.
(Note: The recursive parameter should be set to true if the parent folder contains other folders. If this is the case, files from all the subfolders will be added to the returned list.)
Figure 2.1 Describes the output after executing the Base View
- Create a derived view iv_list_of_files. This view will return the list of files from the folder(s), but will also return the data source names and base view names for the files using naming conventions.
- Set iv_list_of_files to auto-generate the correct names for the files:
- Go to the Output tab.
- Click on New > New Field.
- Choose a Field Name for example name_for_new_baseview.
- Add the following expression for the new field:
'bv_' || regexp(filename, '\.[^.]*$', '') |
- Click Ok.
- Repeat the same steps to create a new field for the data source name name_for_new_datasource. In this case, the regular expression is:
'ds_' || regexp(filename, '\.[^.]*$', '') |
- Click on Metadata and choose /02 - integration/02 - files extraction as the folder location
- Click Save.
Figure 2.2 Shows the steps to create the new field with the base view names.
Step 3: Create the Stored Procedure to generate the VQLs
- Go to the folder /03 - stored procedures.
- Click on the ellipsis icon and go to New > Stored Procedure > Java Stored Procedure.
- Specify the name (e.g sp_type_of_file).
- Check the box “Select Jars”.
- Select denodo-vqlgeneration.
- Select one for the Class Name:
- com.denodo.connect.vqlgeneration.df.GenerateVQLForDFFullDataSourceBaseViewStoredProcedure for Delimited Files.
- com.denodo.connect.vqlgeneration.excel.GenerateVQLForExcelFullDataSourceBaseViewStoredProcedure for Excel files.
- com.denodo.connect.vqlgeneration.json.GenerateVQLForJSONFullDataSourceBaseViewStoredProcedure for JSON files.
- com.denodo.connect.vqlgeneration.xml.GenerateVQLForXMLFullDataSourceBaseViewStoredProcedure for XML files.
- Click Save
- Repeat these steps for each type of file, until 4 new stored procedures are created
Figure 3.1 Show how to import a new Java stored procedure
Figure 3.2 Describes how to create the stored procedure for Excel files
In order to see that everything works fine you need to specify all the necessary attributes and execute the stored procedure.
When the base view for the list of files is generated, the output will include all the files in the folder, even if for some files the base views were already created. This has an impact on the whole process. In a more elegant solution,a view will be created that after execution will only return the files that do not have a base view created for them yet, this is to avoid recreating views and data sources that already exist.
Step 4: Creating a view to display only new files
Note: The names of the data sources are created by adding the prefix ds_ and removing the extension.
Using this naming convention the name of the data source can be reassembled back to the initial filename. With this we can use the GET_ELEMENTS stored procedure to retrieve only the files without a previously created data source. The following code is used to extract data source names from GET_ELEMENTS.
- We want to get existing data sources: set type = 'datasource'.
(or however you named the folder)
- Execute the code below in the VQL Shell.
CREATE VIEW iv_elements_to_be_created AS SELECT * FROM iv_list_of_files WHERE ( parentfolder = input_path AND recursive = is_recursive AND name_for_new_datasource not in ( SELECT name FROM get_elements() WHERE type = 'datasource') ) USING PARAMETERS ( input_path : text, is_recursive : text); |
This view will only return the list of files for which no view or data source has been created yet.
Figure 4.1 Shows the replace function
Now we have to join the view iv_elements_to_be_created that gets the list of new files and the stored procedure that generates the VQL code.
Step 5: Get the VQL code for each new file
We are going to create a new join view for each type of file/data source called generate_baseview_type_of_file_extension. All joins should be named similarly, just changing the extension suffix to refer to the actual file type. The following steps need to be done to join iv_elements_to_be_created and the stored procedure that obtains the VQL, creating a new join for each stored procedure previously created for the different types of files.
Go to the VQL Generators folder and select New > Join:
- Drag and drop the view iv_elements_to_be_created and the stored procedure for the file type that we are going to import.
- Uncheck the box for “Use fixed input parameters” when you drag and drop the stored procedure.
- Connect these two through an arrow.
- Click on the Join Condition icon .
- Match the attributes, through drag and drop, so that the following conditions are set:
iv_elements_to_be_created.fullpath = storedProcedure.file_path_url and |
Figure 5.1 Shows how to create a new Join view.
Figure 5.2 Shows the “Use fixed input parameters” box you need to uncheck.
Figure 5.3 Shows how to set the join condition.
Figure 5.4 Describes the Where Condition of generate_baseview_df
Now all the obligatory conditions should be specified in the where clause and the following condition should also be added to the where clause: iv_elements_to_be_created.extension = extensionSupportedByStoredProcedure.
All the attributes of every stored procedure are described in the document Denodo VQL Generation Stored Procedures - User Manual. Choose the ones right for your implementation.
Note: These conditions should be set like this:
name_of_stored_procedure.database_name = ’byod’ and name_of_stored_procedure.folder = '/DataSources/Name_of_the_folder_of_file_type' and name_of_stored_procedure.base_view_folder = '/BaseViews/Name_of_the_folder_of_file_type' |
For Excel files it is necessary to specify the type of file (type_of_file) which can have two values: "Excel 97-2003 (*.xls)" and "Excel 2007 or later (*.xlsx)"
For Excel, the following condition should be added in the where clause along with the other obligatory conditions:
((sp_for_excel.type_of_file = 'Excel 2007 or later (*.xlsx)' AND iv_elements_to_be_created.extension = 'xlsx') OR (sp_for_excel.type_of_file = 'Excel 97-2003 (*.xls)' AND iv_elements_to_be_created.extension = 'xls'))) |
To feed every Stored Procedure attribute in the JOIN condition or to assign static values to the required parameters, value = storedProcedure.attribute can be added in the WHERE clause.
Note: Value could be for example a result from the Filesystem custom wrapper. If the values of the parameters should be assigned dynamically, view parameters can be created and added to the WHERE clause: requiredParameter = viewParameter.
The values of the parameters input_path and is_recusive from the view iv_elements_to_be_created are dynamically assigned to the related attribute by adding this to the WHERE clause:
iv_elements_to_be_created.recursive = iv_elements_to_be_created.is_recursive AND iv_elements_to_be_created.parentfolder = iv_elements_to_be_created.input_path |
Cache configuration based on filename
In some scenarios we might need to enable the cache for the views that we will be creating on top of the file. We will use file naming conventions to specify the cache configuration. For instance:
- If the cache does not need to be enabled the file can be named however the user wants.
- If full cache should be enabled the file name must contain “_full” in it.
- For partial cache, “_partial” should appear somewhere in the name.
- And so on for the different cache options to be enabled.
To include cache configuration in our view we can add the following field expression to the result field of the generate_baseview_type_of_file_extension join views that we have created.
- Go to Edit > Output
- Go to the result field and click on the Edit button
- Add the following Field expression:
CASE WHEN (filename like '%_full%') THEN replace(result, 'CACHE OFF', 'CACHE FULL') WHEN (filename like '%_partialexactpreload%') THEN replace (result, 'CACHE OFF', 'CACHE PARTIAL EXACT PRELOAD') WHEN (filename like '%_partialpreload%') THEN replace(result, 'CACHE OFF', 'CACHE PARTIAL PRELOAD') WHEN (filename like '%_partialexact%' ) THEN replace(result, 'CACHE OFF', 'CACHE PARTIAL EXACT') WHEN (filename like '%_partial%' ) THEN replace(result, 'CACHE OFF', 'CACHE PARTIAL') ELSE result END |
- Click Ok.
- Save the changes.
Figure 5.5 Shows the creation of the new field for the cache configuration VQL
Step 6: Automating the view creation using Scheduler
So far, we created a model in the ‘byod_setup’ that is able to generate VQL statements to create base views automatically. We have built:
- iv_elements_to_be_created: a view that returns the list of files without an associated base view.
- sp_type_of_file: a different stored procedure for each type of file.
- generate_baseview_type_of_file_extension: a view for each type of file that generates the VQL code that needs to be executed in order to create base views and data sources for new files from the folder.
However, the actual data sources and base views for the respective files were not created yet. The next steps will briefly describe the whole process.
Creating Base Views automatically
To create the data sources and base views we are going to use some Denodo Scheduler jobs. The Denodo Scheduler Server and the Denodo Scheduler Administration Tool must be running.
- From the Scheduler Administration Tool go to Jobs > Add job > VDP.
- Choose the project, name the job and give a description.
- Go to the Extraction section.
- Choose VDP as a Data source.
- Set the Parameterized query with just a parameter name (E.g. @queryParam)
- Go to Sources > Add source and choose VDP:
- Under Query (non parameterized) enter:
select 0 as number, 'CONNECT DATABASE byod;' as result UNION select rownum() as number, result FROM byod_setup.generate_baseview_type_of_file_extension WHERE generate_baseview_type_of_file_extension.input_path='B:\' and generate_baseview_type_of_file_extension.is_recursive=false order by number ASC CONTEXT('cache_wait_for_load' = 'true') TRACE |
- Click on Add new mapping in the Mappings section.
- Set the Query parameter as queryParam.
- Set the Source parameter as result.
- Click Save
Figure 6.1 Create a Scheduler job.
Figure 6.2 Define a parameterized query.
Option 1: Triggers Section
If a periodical creation of base views is needed, the Trigger section can be set up. The Triggers Section allows the user to automate the execution of the Scheduler jobs. The document Time-based Scheduling Section (Triggers Section) explains how to use and set up Cron Expression parameters.
- Open the job you want to set up the trigger.
- Go to Edit > Triggers Section > Add trigger.
- Click on Save.
Figure 6.3 Steps to set up a trigger.
Figure 6.4 Shows the Cron Expression for executing a job every 15 minutes
Option 2: Automating the view creation using a Python Script
Windows version
Another alternative to automate the process, instead of using triggers, is to use a Python Script.
This script will detect if a file is created or if the name of the file has been changed. In this case, we are using a modified version of the second function described in the article Watch a Directory for Changes.
Pre requisites
First, we have to install the following python libraries, we recommend to follow their documentation:
- Pywin32: this library provides access to many of the Windows APIs from Python.
- Requests: it allows you to send HTTP/1.1 requests.
Python Script in detail
First, we import the Python libraries.
import os import win32file import win32con import requests |
Then, we have to copy the second function described in the section Use the ReadDirectoryChanges API of the article Watch a Directory for Changes until the for loop (you can read the article mentioned in order to know more details about this part).
ACTIONS = { 1 : "Created", 2 : "Deleted", 3 : "Updated", 4 : "Renamed from something", 5 : "Renamed to something" } FILE_LIST_DIRECTORY = 0x0001 path_to_watch = "." hDir = win32file.CreateFile ( path_to_watch, FILE_LIST_DIRECTORY, win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE | win32con.FILE_SHARE_DELETE, None, win32con.OPEN_EXISTING, win32con.FILE_FLAG_BACKUP_SEMANTICS, None ) while 1: results = win32file.ReadDirectoryChangesW ( hDir, 1024, True, win32con.FILE_NOTIFY_CHANGE_FILE_NAME | win32con.FILE_NOTIFY_CHANGE_DIR_NAME | win32con.FILE_NOTIFY_CHANGE_ATTRIBUTES | win32con.FILE_NOTIFY_CHANGE_SIZE | win32con.FILE_NOTIFY_CHANGE_LAST_WRITE | win32con.FILE_NOTIFY_CHANGE_SECURITY, None, None) |
Finally, we modify the for loop used in the article mentioned, in order to detect if a file is Created or Renamed and based on the type of the file (csv, json, xml or excel) starts its corresponding job.
for action, file in results: full_filename = os.path.join (path_to_watch, file) if ACTIONS.get (action, "Unknown") == "Created" or ACTIONS.get (action, "Unknown")=="Renamed to something": # we check if the file has been created or renamed
file_type=file.split('.')[1] # we extract in the name the type of the file if file_type == "csv": # when the file is a csv, call the api and execute the job 10 url="http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/10/status?uri=%2F%2Flocalhost%3A8000" s = requests.Session() s.auth = ('admin', 'admin') r=s.put(url, headers={ 'accept': 'application/json','Content-Type': 'application/json'},json={'action':'start'}) if r.status_code == 204: print(full_filename, ACTIONS.get (action, "Unknown")) else: print("Error executing job") if file_type == "json": # when the file is a json, call the api and execute the job 12 url="http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/12/status?uri=%2F%2Flocalhost%3A8000" s = requests.Session() s.auth = ('admin', 'admin') r=s.put(url, headers={ 'accept': 'application/json','Content-Type': 'application/json'},json={'action':'start'}) if r.status_code == 204: print(full_filename, ACTIONS.get (action, "Unknown")) else: print("Error executing job") if file_type == "xlsx": # when the file is a excel, call the api and execute the job 113 url="http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/113/status?uri=%2F%2Flocalhost%3A8000" s = requests.Session() s.auth = ('admin', 'admin') r=s.put(url, headers={ 'accept': 'application/json','Content-Type': 'application/json'},json={'action':'start'}) if r.status_code == 204: print(full_filename, ACTIONS.get (action, "Unknown")) else: print("Error executing job") if file_type == "xml": # when the file is a xml, call the api and execute the job 112 url="http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/112/status?uri=%2F%2Flocalhost%3A8000" s = requests.Session() s.auth = ('admin', 'admin') r=s.put(url, headers={ 'accept': 'application/json','Content-Type': 'application/json'},json={'action':'start'}) if r.status_code == 204: print(full_filename, ACTIONS.get (action, "Unknown")) else: print("Error executing job") |
In the above block of code, first we check if the file has been Created or Renamed. Then, if the file is a csv, we call the Scheduler API to execute the job with ID 10 which is the job used for the creation of data sources and base views that are csv. If the file is a json we call the Scheduler API to start the job with ID 12 which is the job used for the creation of data sources and base views that are json. If it is an excel file, execute its job and same if it is an xml.
To start the jobs we use the endpoint of the Denodo Scheduler API:
- PUT /webadmin/denodo-scheduler-admin/public/api/projects/{projectID}/jobs/{jobID}/status
This endpoint allows change the status of one or more jobs, e.g. starting multiple jobs at the same time. To use it, we have to provide the corresponding host and port where the scheduler server is running and also the project ID and the job ID. It is important to know that it is necessary to put the server’s uri, localhost to access a local Scheduler server and port 8000 for the default port, but this might change.
Then, we start a session, provide the authentication of scheduler server and in the put function of the Requests library we have to specify the following parameters:
- url: is the url of the endpoint of the Denodo Scheduler API.
- headers:{'accept':'application/json','Content-Type':'application/json'}
- json: {'action':'start'} is the JSON body used to start the job.
Finally, we check if the API request has been successfully done (status 204).
Complete Script
import os import win32file import win32con import requests ACTIONS = { 1 : "Created", 2 : "Deleted", 3 : "Updated", 4 : "Renamed from something", 5 : "Renamed to something" } FILE_LIST_DIRECTORY = 0x0001 path_to_watch = "." hDir = win32file.CreateFile ( path_to_watch, FILE_LIST_DIRECTORY, win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE | win32con.FILE_SHARE_DELETE, None, win32con.OPEN_EXISTING, win32con.FILE_FLAG_BACKUP_SEMANTICS, None ) while 1: results = win32file.ReadDirectoryChangesW ( hDir, 1024, True, win32con.FILE_NOTIFY_CHANGE_FILE_NAME | win32con.FILE_NOTIFY_CHANGE_DIR_NAME | win32con.FILE_NOTIFY_CHANGE_ATTRIBUTES | win32con.FILE_NOTIFY_CHANGE_SIZE | win32con.FILE_NOTIFY_CHANGE_LAST_WRITE | win32con.FILE_NOTIFY_CHANGE_SECURITY, None, None ) for action, file in results: full_filename = os.path.join (path_to_watch, file) if ACTIONS.get (action, "Unknown") == "Created" or ACTIONS.get (action, "Unknown")=="Renamed to something": # we check if the file has been created or renamed
file_type=file.split('.')[-1] # we extract in the name, the type of the file if file_type == "csv": # when the file is a csv, call the api and execute the job 10 url="http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/10/status?uri=%2F%2Flocalhost%3A8000" s = requests.Session() s.auth = ('admin', 'admin') r=s.put(url, headers={ 'accept': 'application/json','Content-Type': 'application/json'},json={'action':'start'}) if r.status_code == 204: print(full_filename, ACTIONS.get (action, "Unknown")) else: print("Error executing job") if file_type == "json": # when the file is a json, call the api and execute the job 12 url="http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/12/status?uri=%2F%2Flocalhost%3A8000" s = requests.Session() s.auth = ('admin', 'admin') r=s.put(url, headers={ 'accept': 'application/json','Content-Type': 'application/json'},json={'action':'start'}) if r.status_code == 204: print(full_filename, ACTIONS.get (action, "Unknown")) else: print("Error executing job") if file_type == "xlsx": # when the file is a excel, call the api and execute the job 113 url="http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/113/status?uri=%2F%2Flocalhost%3A8000" s = requests.Session() s.auth = ('admin', 'admin') r=s.put(url, headers={ 'accept': 'application/json','Content-Type': 'application/json'},json={'action':'start'}) if r.status_code == 204: print(full_filename, ACTIONS.get (action, "Unknown")) else: print("Error executing job") if file_type == "xml": # when the file is a xml, call the api and execute the job 112 url="http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/112/status?uri=%2F%2Flocalhost%3A8000" s = requests.Session() s.auth = ('admin', 'admin') r=s.put(url, headers={ 'accept': 'application/json','Content-Type': 'application/json'},json={'action':'start'}) if r.status_code == 204: print(full_filename, ACTIONS.get (action, "Unknown")) else: print("Error executing job") |
Script execution output
Once we have the script done, in order to execute it, we have to do the following steps:
- Open a terminal.
- Go to the directory that you want to monitor.
- Activate the python environment.
- Execute the script with the command: python <script_name>
We can see in the screenshot below the output when the script is executed. If we create a file or rename a file in the directory.
Figure 6.5: Script execution
Linux version
Pre requisites
We can also make a script compatible with Linux systems. We will need to use the following Python libraries:
- Watchdogs: this library works as a Python API and shell utilities to monitor file system events.
- Requests: it allows you to send HTTP/1.1 requests.
- Time: This module provides various time-related functions.
Python Script in detail
First, we import the Python libraries.
import time
import requests
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
Then, we create a class OnMyWatch is the class that watches for any file system change and then dispatches the event to the event handler. Then, we create a Handler class which works as an object that will be notified when something happens to the file system.
In this case we use two methods for monitoring changes: on_created detects when a file or a directory is created and on_modified is executed when a file is modified or a directory renamed. Each one of those methods receives the event object as first parameter, and the event object has 3 attributes:
- event_type: modified/created
- is_directory: True/False
- src_path: path/to/observe/file
class OnMyWatch:
watchDirectory = "." # Replace with your desired directory path
def __init__(self):
self.observer = Observer()
def run(self):
event_handler = Handler()
self.observer.schedule(event_handler, self.watchDirectory, recursive=True)
self.observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
self.observer.stop()
print("Observer Stopped")
self.observer.join()
class Handler(FileSystemEventHandler):
def on_modified(self, event):
if not event.is_directory:
print(f"Detected change: modified - {event.src_path}")
send_api_request(event.src_path)
def on_created(self, event):
if not event.is_directory:
print(f"Detected file creation: - {event.src_path}")
send_api_request(event.src_path)
Finally, we implement a function where we have built some conditions to detect the types of the files that have been created or modified. And then, execute their corresponding Scheduler jobs.
def send_api_request(src_path):
file_type = src_path.split(".")[-1] # Get the file extension
if file_type == "csv":
url = "http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/10/status?uri=%2F%2Flocalhost%3A8000" # when the file is a csv, call the api and execute the job 10
elif file_type == "json":
url = "http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/12/status?uri=%2F%2Flocalhost%3A8000" # when the file is a json, call the api and execute the job 12
elif file_type == "xml":
url = "http://172.18.160.1:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/112/status?uri=%2F%2Flocalhost%3A8000" # when the file is a xml, call the api and execute the job 112
elif file_type == "xlsx":
url = "http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/113/status?uri=%2F%2Flocalhost%3A8000" # when the file is a xlsx, call the api and execute the job 113
else:
print(f"Ignoring file with unsupported extension: {src_path}")
return
s = requests.Session()
s.auth = ('admin', 'admin')
try:
r = s.put(url, headers={'accept': 'application/json', 'Content-Type': 'application/json'}, json={'action': 'start'})
if r.status_code == 204:
print("API request sent successfully")
else:
print(f"Failed to send API request. Status code: {r.status_code}")
except requests.exceptions.RequestException as e:
print(f"Error sending API request: {e}")
if __name__ == '__main__':
watch = OnMyWatch()
watch.run()
Complete Script
import time
import requests
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class OnMyWatch:
# Set the directory to watch
watchDirectory = "." # Replace with your desired directory path
def __init__(self):
self.observer = Observer()
def run(self):
event_handler = Handler()
self.observer.schedule(event_handler, self.watchDirectory, recursive=True)
self.observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
self.observer.stop()
print("Observer Stopped")
self.observer.join()
class Handler(FileSystemEventHandler):
def on_modified(self, event):
if not event.is_directory:
print(f"Detected change: modified - {event.src_path}")
send_api_request(event.src_path)
def on_created(self, event):
if not event.is_directory:
print(f"Detected file creation: - {event.src_path}")
send_api_request(event.src_path)
def send_api_request(src_path):
file_type = src_path.split(".")[-1] # Get the file extension
if file_type == "csv":
url = "http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/10/status?uri=%2F%2Flocalhost%3A8000"
elif file_type == "json":
url = "http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/12/status?uri=%2F%2Flocalhost%3A8000"
elif file_type == "xml":
url = "http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/112/status?uri=%2F%2Flocalhost%3A8000"
elif file_type == "xlsx":
url = "http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/113/status?uri=%2F%2Flocalhost%3A8000"
else:
print(f"Ignoring file with unsupported extension: {src_path}")
return
s = requests.Session()
s.auth = ('admin', 'admin')
try:
r = s.put(url, headers={'accept': 'application/json', 'Content-Type': 'application/json'}, json={'action': 'start'})
if r.status_code == 204:
print("API request sent successfully")
else:
print(f"Failed to send API request. Status code: {r.status_code}")
except requests.exceptions.RequestException as e:
print(f"Error sending API request: {e}")
if __name__ == '__main__':
watch = OnMyWatch()
watch.run()
Script execution output
Once we have the script done, in order to execute it, we have to do the following steps:
- Open a terminal.
- Go to the directory that you want to monitor.
- Activate the python environment.
- Execute the script with the command: python3 <script_name>
We can see in the screenshot below the output when the script is executed. If we create a file or modify a file in the directory.
Step 7: Deletion of views and data sources
A Scheduler job for the deletion of base views and data sources can also be created. If a file in the specified directory is deleted, then all the related data sources and base views can be deleted too. To do this also automatically, the base views should be deleted on cascade so that all related views will also be deleted.
For reasons of accuracy, it is recommended to create a view, which will show data sources to be deleted. To do so, use the GET_ELEMENTS() stored procedure to get all data sources that are already created and then use iv_list_of_files view to compare the results. Only the files that have been deleted will be shown.
- Open a VQL Sell and run the query below:
CREATE OR REPLACE VIEW iv_elements_to_be_deleted FOLDER = '/02 - integration/03 - files deletion' AS SELECT name, subtype, path_to_file FROM get_elements() WHERE (type = 'datasource' AND (folder = '/01 - integration/final view/01 - data sources/xml' OR folder = '/01 - integration/final view/01 - data sources/delimited files' OR folder = '/01 - integration/final view/01 - data sources/json' OR folder = '/01 - integration/final view/01 - data sources/excel') AND input_database_name = 'byod' AND name not in (SELECT name_for_new_datasource FROM iv_list_of_files WHERE (recursive = false AND parentfolder = path_to_file))) USING PARAMETERS ( path_to_file : text); |
Note: The name_for_new_datasource we created earlier in an example is the exact name of a data source, if you decided to name the data source any other way, you have to place an attribute that contains the exact name of the data source. If you have partitioned your files in a different way you need to adapt this section: (folder ='/datasources/xml' or folder = '/datasources/delimited_files' or folder = '/datasources/json' or folder='/datasources/excel') . Generally speaking, folder = ‘path/to/file’ and ‘path/to/file’ is the exact path from data source Configuration > Metadata > Folder.
- Open the Denodo Scheduler and create a new Job:
- Click on Jobs > Add job > VDP.
- Choose the project, name the job and give a description.
- Go to the Extraction section.
- Choose VDP as a Data source.
- Set Parameterized query as DROP DATASOURCE @subtype "@param" CASCADE.
- Click on Add source and choose VDP as Data source.
- Enter in the Query (non parameterized) field:
select name, subtype from byod_setup.iv_iv_elements_to_be_deleted where path_to_file = 'path/to/file' |
- Click on Add new mapping: enter param as Query parameter, enter name as Source parameter.
- Click on Add new mapping: enter subtype as Query parameter, enter subtype as Source parameter.
- Click on Save.
Figure 7.1 Deletion job creation
Figure 7.2 Mapping creation
Conclusions
In this document we have seen how we can automatically create the necessary data sources and base views to integrate information coming from files that are added to a repository in a dynamic way.
First, we have created the necessary views to get the information from the files that are being added to the repository and to construct the VQL for the data sources and views that will be used to integrate the files.
Then, we have created Scheduler jobs to periodically query these views and execute the VQL obtained from the to actually create the elements needed to integrate the information coming from those fields. Also, another alternative is presented with the implementation of a Python Script (different versions for Linux and Windows were presented).
The specific example presented in this document can be adapted to fit different scenarios depending on the type of files that will be accessed, how they will be added to the data set and so on.
The information provided in the Denodo Knowledge Base is intended to assist our users in advanced uses of Denodo. Please note that the results from the application of processes and configurations detailed in these documents may vary depending on your specific environment. Use them at your own discretion.
For an official guide of supported features, please refer to the User Manuals. For questions on critical systems or complex environments we recommend you to contact your Denodo Customer Success Manager.