You can translate the document:

Objective

In this article, we are going to describe step by step how we can build a workflow to allow automation of datasets such as JSON, CSV, XML and Excel as part of the Denodo Catalog. This enables Denodo Business power users to consume and combine files in an easy way through the Denodo Platform.

For an existing folder that contains different file types, this feature creates Base Views automatically.   

Implementing a “Bring Your Own Data” workflow

Pre-configuration steps

In order to implement this workflow is necessary to:

  1. Download and Install the FileSystem Custom Wrapper
  2. Download and Install the VQL Generation Stored Procedures

Note:  The links contain highlighted parts that should be followed in case the Stored Procedure and the Custom Wrapper are not already installed.

Implementation

In this example,  files are placed in a Local Path, meaning, a folder accessible to the VDP Server. This can be for example a Network Drive accessible to users and VDP Server/s.  The figure below displays the location of the files and the files that are available. The full path of the files is: B:\Bring Your Own Data'.

Figure 1.1 Bring Your Own Data folder

To implement this workflow we will use 2 virtual databases. The first one, ‘byod_setup, contains the data sources/ base views/ stored procedures/ etc. that are instrumental to generate the mapping data sources and views in sync with the files. The second one, byod, will contain the automatically generated data sources and base views mapping the files from the file resource location. In this example: ‘B:\Bring Your Own Data.

To create the two databases the following steps should be taken:

  1. Open the Denodo Design Studio
  2. Go to Administration > Database Management
  3. Click on New 
  4. Enter the name byod_setup and a description.
  5. Click on Ok button.
  6. Redo steps 3 - 5 for the database named ‘byod’.

For a better overview at the end of this implementation, it is recommended to organize your newly created data sources/ base views/ stored procedures/ etc. in separate Denodo folders in the Virtual DataPort Administration Tool. This article follows the VDP Naming Convention.

The image below describes the folder structure for the ‘byod_setup’ and the ‘byod’ virtual databases.

 

Figure 1.2 Describes the databases structure

The VQL code below will create the folders described for the  byod_setup database.

CREATE OR REPLACE FOLDER '/01 - Connectivity' DESCRIPTION 'This folder stores the data sources and base views of the project' ;

CREATE OR REPLACE FOLDER '/01 - Connectivity/01 - Data Sources' DESCRIPTION 'This folder stores the data sources of the project' ;

CREATE OR REPLACE FOLDER '/01 - Connectivity/02 - Base Views' DESCRIPTION 'This folder stores the base views of the project' ;

CREATE OR REPLACE FOLDER '/02 - Integration' DESCRIPTION 'The integration views created for combining the different base views are stored in this folder' ;

CREATE OR REPLACE FOLDER '/02 - Integration/01 - Files' DESCRIPTION 'This folder stores the view listing all files of the local folder' ;

CREATE OR REPLACE FOLDER '/02 - Integration/02 - Files Extraction' DESCRIPTION 'This folder stores the view listing the files that should be created' ;

CREATE OR REPLACE FOLDER '/02 - Integration/03 - Files Deletion' DESCRIPTION 'This folder stores the view listing the files that should be deleted' ;

CREATE OR REPLACE FOLDER '/02 - Integration/04 - VQL Generator' DESCRIPTION 'This folder stores the view returning the VQL for creating data sources and base views of the files' ;

CREATE OR REPLACE FOLDER '/03 - Stored Procedures' DESCRIPTION 'This folder stores Stored procedures' ;

The VQL code below will create the folders described for the byod database.

CREATE OR REPLACE FOLDER '/02 - Integration' DESCRIPTION 'The integration views created for combining the different base views are stored in this folder' ;

CREATE OR REPLACE FOLDER '/02 - Integration/01 - Final View' DESCRIPTION 'This folder stores the view listing all files of the local folder' ;

CREATE OR REPLACE FOLDER '/02 - Integration/01 - Final View/01 - Data Source' DESCRIPTION 'This folder stores the data sources of the database' ;

CREATE OR REPLACE FOLDER '/02 - Integration/01 - Final View/02 - Base View' DESCRIPTION 'This folder stores the base views of the database' ;

CREATE OR REPLACE FOLDER '/02 - Integration/01 - Final View/01 - Data Source/Delimited Files' DESCRIPTION 'This folder stores the data sources of Delimited Files' ;

CREATE OR REPLACE FOLDER '/02 - Integration/01 - Final View/01 - Data Source/Excel' DESCRIPTION 'This folder stores the data sources of Excel files' ;

CREATE OR REPLACE FOLDER '/02 - Integration/01 - Final View/01 - Data Source/JSON' DESCRIPTION 'This folder stores the data sources of JSON files' ;

CREATE OR REPLACE FOLDER '/02 - Integration/01 - Final View/01 - Data Source/XML' DESCRIPTION 'This folder stores the data sources of XML files' ;

CREATE OR REPLACE FOLDER '/02 - Integration/01 - Final View/02 - Base View/Delimited Files' DESCRIPTION 'This folder stores the base views of Delimited Files' ;

CREATE OR REPLACE FOLDER '/02 - Integration/01 - Final View/02 - Base View/Excel' DESCRIPTION 'This folder stores the base views of Excel files' ;

CREATE OR REPLACE FOLDER '/02 - Integration/01 - Final View/02 - Base View/JSON' DESCRIPTION 'This folder stores the base views of JSON files' ;

CREATE OR REPLACE FOLDER '/02 - Integration/01 - Final View/02 - Base View/XML' DESCRIPTION 'This folder stores the base views of XML files' ;

The next steps present the configuration of the data sources, views and stored procedures needed to set up the ‘byod_setupdatabase.

Step 1: Create a data source to extract a list of files

  1. From the Web Design Studio or the Virtual DataPort Administration Tool.
  2. Click the + sign beside the byod_setup’ database
  3. Click on the ellipsis icon  or right-click on /01 - connectivity/01 - data sources > New > Data source > Custom.
  4. A new window (“new_custom_datasource”) will be open, in which:
  1. Specify the name of the Custom Data Source (e.g ds_localfolder).
  2. Check the box “Select jars”.
  3. Choose the existing extension denodo-filesystem-customwrapper.
  4. As “Class Name select com.denodo.connect.filesystem.ReadFileSystemConnector.
  1. Click Save.

Figure 1.3 Shows how to open a “new_custom_datasource”

        Figure 1.4  Creates the custom data source.

Step 2: Create the Base View to retrieve a list of all files

  1. Open the just-created data source ds_localfolder.        
  2. Click on Create base view. 
  3. Edit the name ( e.g bv_localfolder).
  4. Click on Metadata and choose /01 - connectivity/02 - base views as the folder location.
  5. Click Save.

To check that everything works, you can execute the base view that we just created. Go to the Execution panel, enter parentfolder = ‘B:\Bring Your Own Data’ and recursive = false as where condition and click on Execute.

(Note: The recursive parameter should be set to true if the parent folder contains other folders. If this is the case, files from all the subfolders will be added to the returned list.)

Figure 2.1   Describes the output after executing the Base View

  1. Create a derived view iv_list_of_files. This view will return the list of files from the folder(s), but will also return the data source names and base view names for the files using naming conventions.
  2. Set iv_list_of_files to auto-generate the correct names for the files:
  1. Go to the Output tab.
  2. Click on New > New Field.
  3. Choose a Field Name for example name_for_new_baseview.
  4. Add the following expression for the new field:

 'bv_' || regexp(filename, '\.[^.]*$', '')

  1. Click Ok.
  2.  Repeat the same steps to create a new field for the data source name  name_for_new_datasource. In this case, the regular expression is:

'ds_' || regexp(filename, '\.[^.]*$', '')

  1. Click on Metadata and choose /02 - integration/02 - files extraction as the folder location
  2. Click Save.

Figure 2.2 Shows the steps to create the new field with the base view names.

Step 3: Create the Stored Procedure to generate the VQLs

  1. Go to the folder /03 - stored procedures.
  2. Click on the ellipsis icon and go to New > Stored Procedure > Java Stored Procedure.
  3. Specify the name (e.g sp_type_of_file).
  4. Check the box “Select Jars”.
  5. Select denodo-vqlgeneration.
  6. Select one for the Class Name:
  1. com.denodo.connect.vqlgeneration.df.GenerateVQLForDFFullDataSourceBaseViewStoredProcedure for Delimited Files.
  2. com.denodo.connect.vqlgeneration.excel.GenerateVQLForExcelFullDataSourceBaseViewStoredProcedure for Excel files.
  3. com.denodo.connect.vqlgeneration.json.GenerateVQLForJSONFullDataSourceBaseViewStoredProcedure for JSON files.
  4. com.denodo.connect.vqlgeneration.xml.GenerateVQLForXMLFullDataSourceBaseViewStoredProcedure for XML files.
  1. Click Save
  2. Repeat these steps for each type of file, until 4 new stored procedures are created

Figure 3.1 Show how to import a new Java stored procedure

Figure 3.2 Describes how to create the stored procedure for Excel files

In order to see that everything works fine you need to specify all the necessary attributes and execute the stored procedure.

When the base view for the list of files is generated, the output will include all the files in the folder, even if for some files the base views were already created. This has an impact on the whole process. In a more elegant solution,a view will be created that after execution will only return the files that do not have a base view created for them yet, this is to avoid recreating views and data sources that already exist.

Step 4:  Creating a view to display only new files

Note: The names of the data sources are created by adding the prefix ds_ and removing the extension.

Using this naming convention the name of the data source can be reassembled back to the initial filename. With this we can use the GET_ELEMENTS stored procedure to retrieve only the files without a previously created data source. The following code is used to extract data source names from GET_ELEMENTS.

  1. We want to get existing data sources: set type = 'datasource'. 

(or however you named the folder)

  1. Execute the code below in the VQL Shell.

CREATE VIEW iv_elements_to_be_created AS

SELECT * FROM iv_list_of_files 

WHERE

(

parentfolder = input_path

AND recursive = is_recursive

AND name_for_new_datasource not in

(

SELECT name FROM get_elements() WHERE type = 'datasource')

)

USING PARAMETERS ( input_path : text, is_recursive : text);

This view will only return the list of files for which no view or data source has been created yet.

Figure 4.1 Shows the replace function

Now we have to join the view iv_elements_to_be_created that gets the list of new files and the stored procedure that generates the VQL code.

Step 5:  Get the VQL code for each new file

We are going to create a new join view for each type of file/data source called generate_baseview_type_of_file_extension. All joins should be named similarly, just changing the extension suffix to refer to the actual file type. The following steps need to be done to join iv_elements_to_be_created and the stored procedure that obtains the VQL, creating a new join for each stored procedure previously created for the different types of files.

Go to  the VQL Generators folder and select  New > Join:

  1. Drag and drop the view iv_elements_to_be_created and the stored procedure for the file type that we are going to import.
  2. Uncheck the box for “Use fixed input parameters” when you drag and drop the stored procedure.
  3. Connect these two through an arrow.
  4. Click on the Join Condition icon .
  5. Match the attributes, through drag and drop, so that the following conditions are set:

iv_elements_to_be_created.fullpath = storedProcedure.file_path_url and
iv_elements_to_be_created.
name_for_new_baseview = storedProcedure.base_view_name and
iv_elements_to_be_created.
name_for_new_datasource = storedProcedure.datasource_name 

Figure 5.1 Shows how to create a new Join view.

Figure 5.2 Shows the Use fixed input parameters box you need to uncheck.

Figure 5.3 Shows how to set the join condition.

Figure 5.4 Describes the Where Condition of generate_baseview_df

Now all the obligatory conditions should be specified in the where clause and the following condition should also be added to the where clause:  iv_elements_to_be_created.extension = extensionSupportedByStoredProcedure.

All the attributes of every stored procedure are described in the document Denodo VQL Generation Stored Procedures - User Manual. Choose the ones right for your implementation.

Note: These conditions should be set like this:

name_of_stored_procedure.database_name = ’byod’  and

name_of_stored_procedure.folder = '/DataSources/Name_of_the_folder_of_file_type' and name_of_stored_procedure.base_view_folder = '/BaseViews/Name_of_the_folder_of_file_type'

 

For Excel files  it is necessary to specify the type of file (type_of_file) which can have two values: "Excel 97-2003 (*.xls)" and "Excel 2007 or later (*.xlsx)"

For Excel, the following condition should be added in the where clause along with the other obligatory conditions:

((sp_for_excel.type_of_file = 'Excel 2007 or later (*.xlsx)' AND iv_elements_to_be_created.extension = 'xlsx') OR (sp_for_excel.type_of_file = 'Excel 97-2003 (*.xls)' AND iv_elements_to_be_created.extension = 'xls')))

To feed every Stored Procedure attribute in the JOIN condition or to assign static values to the required parameters,  value = storedProcedure.attribute can be added in the WHERE clause.

Note: Value could be for example a result from the Filesystem custom wrapper. If the values of the parameters should be assigned dynamically, view parameters can be created and added to the WHERE clause: requiredParameter = viewParameter. 


The values of the parameters input_path and is_recusive from the view iv_elements_to_be_created are dynamically assigned to the related attribute by adding this to the WHERE clause:

iv_elements_to_be_created.recursive = iv_elements_to_be_created.is_recursive AND iv_elements_to_be_created.parentfolder = iv_elements_to_be_created.input_path

Cache configuration based on filename                
In some scenarios we might need to enable the cache for the views that we will be creating on top of the file. We will use file naming conventions to specify the cache configuration. For instance:

  • If the cache does not need to be enabled the file can be named however the user wants.
  • If full cache should be enabled the file name must contain “_full in it.  
  • For partial cache, “_partial” should appear somewhere in the name.
  • And so on for the different cache options to be enabled.

To include cache configuration in our view we can add the following field expression to the result field of the generate_baseview_type_of_file_extension join views that we have created.

  1. Go to Edit > Output
  2. Go to the result field and click on  the Edit button
  3. Add the following Field expression:

CASE

WHEN (filename like '%_full%')

THEN replace(result, 'CACHE OFF', 'CACHE FULL')

WHEN (filename like '%_partialexactpreload%')

THEN replace (result, 'CACHE OFF',

'CACHE PARTIAL EXACT PRELOAD')

WHEN (filename like '%_partialpreload%')

THEN replace(result, 'CACHE OFF', 'CACHE PARTIAL PRELOAD')

WHEN (filename like '%_partialexact%' )

THEN replace(result, 'CACHE OFF', 'CACHE PARTIAL EXACT')

WHEN (filename like '%_partial%' )

THEN replace(result, 'CACHE OFF', 'CACHE PARTIAL')

ELSE result

END

  1. Click Ok.
  2. Save the changes.

Figure 5.5 Shows the creation of the new field for the cache configuration VQL

Step 6: Automating the view creation using Scheduler

So far, we created a model in the ‘byod_setup’ that is able to generate VQL statements to create base views automatically. We have built:

  • iv_elements_to_be_created: a view that returns the list of files without an associated base view. 
  • sp_type_of_file: a different stored procedure for each type of file.
  • generate_baseview_type_of_file: a view for each type of file that generates the VQL code that needs to be executed in order to create base views and data sources for new files from the folder.

However, the actual data sources and base views for the respective files were not created yet. The next steps will briefly describe the whole process.

Creating Base Views automatically

To create the data sources and base views we are going to use some Denodo Scheduler jobs. The Denodo Scheduler Server and the Denodo Scheduler Administration Tool must be running.

  1. From the Scheduler Administration Tool go to Jobs > Add job > VDP.
  2. Choose the project, name the job and give a description.
  3. Go to the Extraction section.
  4. Choose VDP as Data source.
  5. Set the Parameterized query with just a parameter name (E.g. @queryParam)
  6. Go to Sources > Add source and choose VDP:

  1. Under Query (non parameterized) enter: 

                                

select 0 as number, 'CONNECT DATABASE byod;' as result

UNION

select rownum() as number, result

FROM byod_setup.generate_baseview_type_of_file

WHERE generate_baseview_type_of_file.input_path='B:\' and generate_baseview_type_of_file.is_recursive=false

order by number ASC

CONTEXT('cache_wait_for_load' = 'true') TRACE

  1. Click on Add new mapping in the Mappings section.
  2. Set the Query parameter as queryParam.
  3. Set the Source parameter as result.
  1. Click Save 

Figure 6.1 Create a Scheduler job.

Figure 6.2 Define a parameterized query.

Option 1: Triggers Section

If a periodical creation of base views is needed, the Trigger section can be set up. The Triggers Section allows the user to automate the execution of the Scheduler jobs.  The document Time-based Scheduling Section (Triggers Section) explains how to use and set up Cron Expression parameters.

  1. Open the job you want to set up the trigger.
  2. Go to Edit > Triggers Section >  Add trigger.
  3. Click on Save.

Figure 6.3 Steps to set up a trigger.

Figure 6.4 Shows the Cron Expression for executing a job every 15 minutes

Option 2: Automating the view creation using a Python Script

Windows version

Another alternative to automate the process, instead of using triggers, is to use a Python Script.

This script will detect if a file is created or if the name of the file has been changed. In this case, we are using a modified version of the second function described in the article  Watch a Directory for Changes.

Pre requisites

First, we have to install the following python libraries, we recommend to follow their documentation:

  • Pywin32: this library provides access to many of the Windows APIs from Python.
  • Requests: it allows you to send HTTP/1.1 requests.

Python Script in detail

First, we import the Python libraries.

import os

import win32file

import win32con

import requests

Then, we have to copy the second function described in the section Use the ReadDirectoryChanges API of the article  Watch a Directory for Changes until the for loop (you can read the article mentioned  in order to know more details about this part).

ACTIONS = {

  1 : "Created",

  2 : "Deleted",

  3 : "Updated",

  4 : "Renamed from something",

  5 : "Renamed to something"

}

FILE_LIST_DIRECTORY = 0x0001

path_to_watch = "."

hDir = win32file.CreateFile (

  path_to_watch,

  FILE_LIST_DIRECTORY,

win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE | win32con.FILE_SHARE_DELETE,

  None,

  win32con.OPEN_EXISTING,

  win32con.FILE_FLAG_BACKUP_SEMANTICS,

  None

)

while 1:

  results = win32file.ReadDirectoryChangesW (

    hDir,

    1024,

    True,

    win32con.FILE_NOTIFY_CHANGE_FILE_NAME |

     win32con.FILE_NOTIFY_CHANGE_DIR_NAME |

     win32con.FILE_NOTIFY_CHANGE_ATTRIBUTES |

     win32con.FILE_NOTIFY_CHANGE_SIZE |

     win32con.FILE_NOTIFY_CHANGE_LAST_WRITE |

     win32con.FILE_NOTIFY_CHANGE_SECURITY,

    None,

    None)

Finally, we modify the for loop used in the article mentioned, in order to detect if a file is Created or Renamed and based on the type of the file (csv, json, xml or excel) starts its corresponding job.

for action, file in results:

    full_filename = os.path.join (path_to_watch, file)

    if ACTIONS.get (action, "Unknown") == "Created" or ACTIONS.get (action, "Unknown")=="Renamed to something":  # we check if the file has been created or renamed

       

        file_type=file.split('.')[1] # we extract in the name the type of the file

        if file_type == "csv": # when the file is a csv, call the api and execute the job 10

          url="http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/10/status?uri=%2F%2Flocalhost%3A8000"

          s = requests.Session()

          s.auth = ('admin', 'admin')

          r=s.put(url, headers={ 'accept': 'application/json','Content-Type': 'application/json'},json={'action':'start'})

          if r.status_code == 204:

            print(full_filename, ACTIONS.get (action, "Unknown"))

          else:

            print("Error executing job")

        if file_type == "json":  # when the file is a json, call the api and execute the job 12

          url="http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/12/status?uri=%2F%2Flocalhost%3A8000"

          s = requests.Session()

          s.auth = ('admin', 'admin')

          r=s.put(url, headers={ 'accept': 'application/json','Content-Type': 'application/json'},json={'action':'start'})

          if r.status_code == 204:

            print(full_filename, ACTIONS.get (action, "Unknown"))

          else:

            print("Error executing job")

        if file_type == "xlsx":  # when the file is a excel, call the api and execute the job 113

          url="http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/113/status?uri=%2F%2Flocalhost%3A8000"

          s = requests.Session()

          s.auth = ('admin', 'admin')

          r=s.put(url, headers={ 'accept': 'application/json','Content-Type': 'application/json'},json={'action':'start'})

          if r.status_code == 204:

            print(full_filename, ACTIONS.get (action, "Unknown"))

          else:

            print("Error executing job")

        if file_type == "xml":  # when the file is a xml, call the api and execute the job 112

          url="http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/112/status?uri=%2F%2Flocalhost%3A8000"

          s = requests.Session()

          s.auth = ('admin', 'admin')

          r=s.put(url, headers={ 'accept': 'application/json','Content-Type': 'application/json'},json={'action':'start'})

          if r.status_code == 204:

            print(full_filename, ACTIONS.get (action, "Unknown"))

          else:

            print("Error executing job")

In the above block of code, first we check if the file has been Created or Renamed. Then, if the file is a csv, we call the Scheduler API to execute the job with ID 10 which is the job used for the creation of data sources and base views that are csv. If the file is a json we call the Scheduler API to start the job with ID 12 which is the job used for the creation of data sources and base views that are json. If it is an excel file, execute its job and same if it is an xml.

To start the jobs we use the endpoint of the Denodo Scheduler API:

  • PUT /webadmin/denodo-scheduler-admin/public/api/projects/{projectID}/jobs/{jobID}/status

This endpoint allows change the status of one or more jobs, e.g. starting multiple jobs at the same time. To use it, we have to provide the corresponding host and port where the scheduler server is running and also the project ID and the job ID. It is important to know that it is necessary to put the server’s uri, localhost to access a local Scheduler server and port 8000 for the default port, but this might change.

Then, we start a session, provide the authentication of scheduler server and in the put function of the Requests library we have to specify the following parameters:

  • url: is the url of the endpoint of the Denodo Scheduler API.
  • headers:{'accept':'application/json','Content-Type':'application/json'}
  • json: {'action':'start'} is the JSON body used to start the job.

Finally, we check if the API request has been successfully done (status 204).

Complete Script

import os

import win32file

import win32con

import requests

ACTIONS = {

  1 : "Created",

  2 : "Deleted",

  3 : "Updated",

  4 : "Renamed from something",

  5 : "Renamed to something"

}

FILE_LIST_DIRECTORY = 0x0001

path_to_watch = "."

hDir = win32file.CreateFile (

  path_to_watch,

  FILE_LIST_DIRECTORY,

  win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE | win32con.FILE_SHARE_DELETE,

  None,

  win32con.OPEN_EXISTING,

  win32con.FILE_FLAG_BACKUP_SEMANTICS,

  None

)

while 1:

  results = win32file.ReadDirectoryChangesW (

    hDir,

    1024,

    True,

    win32con.FILE_NOTIFY_CHANGE_FILE_NAME |

     win32con.FILE_NOTIFY_CHANGE_DIR_NAME |

     win32con.FILE_NOTIFY_CHANGE_ATTRIBUTES |

     win32con.FILE_NOTIFY_CHANGE_SIZE |

     win32con.FILE_NOTIFY_CHANGE_LAST_WRITE |

     win32con.FILE_NOTIFY_CHANGE_SECURITY,

    None,

    None

  )

  for action, file in results:

    full_filename = os.path.join (path_to_watch, file)

    if ACTIONS.get (action, "Unknown") == "Created" or ACTIONS.get (action, "Unknown")=="Renamed to something":  # we check if the file has been created or renamed

       

        file_type=file.split('.')[-1] # we extract in the name, the type of the file

        if file_type == "csv": # when the file is a csv, call the api and execute the job 10

          url="http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/10/status?uri=%2F%2Flocalhost%3A8000"

          s = requests.Session()

          s.auth = ('admin', 'admin')

          r=s.put(url, headers={ 'accept': 'application/json','Content-Type': 'application/json'},json={'action':'start'})

          if r.status_code == 204:

            print(full_filename, ACTIONS.get (action, "Unknown"))

          else:

            print("Error executing job")

        if file_type == "json":  # when the file is a json, call the api and execute the job 12

          url="http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/12/status?uri=%2F%2Flocalhost%3A8000"

          s = requests.Session()

          s.auth = ('admin', 'admin')

          r=s.put(url, headers={ 'accept': 'application/json','Content-Type': 'application/json'},json={'action':'start'})

          if r.status_code == 204:

            print(full_filename, ACTIONS.get (action, "Unknown"))

          else:

            print("Error executing job")

        if file_type == "xlsx":  # when the file is a excel, call the api and execute the job 113

          url="http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/113/status?uri=%2F%2Flocalhost%3A8000"

          s = requests.Session()

          s.auth = ('admin', 'admin')

          r=s.put(url, headers={ 'accept': 'application/json','Content-Type': 'application/json'},json={'action':'start'})

          if r.status_code == 204:

            print(full_filename, ACTIONS.get (action, "Unknown"))

          else:

            print("Error executing job")

        if file_type == "xml":  # when the file is a xml, call the api and execute the job 112

          url="http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/112/status?uri=%2F%2Flocalhost%3A8000"

          s = requests.Session()

          s.auth = ('admin', 'admin')

          r=s.put(url, headers={ 'accept': 'application/json','Content-Type': 'application/json'},json={'action':'start'})

          if r.status_code == 204:

            print(full_filename, ACTIONS.get (action, "Unknown"))

          else:

            print("Error executing job")

Script execution output

Once we have the script done, in order to execute it, we have to do the following steps:

  1. Open a terminal.
  2. Go to the directory that you want to monitor.
  3. Activate the python environment.
  4. Execute the script with the command: python <script_name>

We can see in the screenshot below the output when the script is executed. If we create a file or rename a file in the directory.

Figure 6.5: Script execution

Linux version

Pre requisites

We can also make a script compatible with Linux systems. We will need to use the following Python libraries:

  • Watchdogs: this library works as a Python API and shell utilities to monitor file system events.
  • Requests: it allows you to send HTTP/1.1 requests.
  • Time: This module provides various time-related functions.

Python Script in detail

First, we import the Python libraries.

import time

import requests

from watchdog.observers import Observer

from watchdog.events import FileSystemEventHandler

Then, we create a class OnMyWatch is the class that watches for any file system change and then dispatches the event to the event handler. Then, we create a Handler class which works as an object that will be notified when something happens to the file system.

In this case we use two methods for monitoring changes: on_created detects when a file or a directory is created and on_modified is executed when a file is modified or a directory renamed. Each one of those methods receives the event object as first parameter, and the event object has 3 attributes:

  • event_type: modified/created
  • is_directory: True/False
  • src_path: path/to/observe/file

class OnMyWatch:

   

    watchDirectory = "."  # Replace with your desired directory path

    def __init__(self):

        self.observer = Observer()

    def run(self):

        event_handler = Handler()

        self.observer.schedule(event_handler, self.watchDirectory, recursive=True)

        self.observer.start()

        try:

            while True:

                time.sleep(1)

        except KeyboardInterrupt:

            self.observer.stop()

            print("Observer Stopped")

        self.observer.join()

class Handler(FileSystemEventHandler):

    def on_modified(self, event):

        if not event.is_directory:

            print(f"Detected change: modified - {event.src_path}")

            send_api_request(event.src_path)

    def on_created(self, event):

        if not event.is_directory:

            print(f"Detected file creation: - {event.src_path}")

            send_api_request(event.src_path)

Finally, we implement a function where we have built some conditions to detect the types of the files that have been created or modified. And then, execute their corresponding Scheduler jobs.

def send_api_request(src_path):

    file_type = src_path.split(".")[-1]  # Get the file extension

    if file_type == "csv":

        url = "http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/10/status?uri=%2F%2Flocalhost%3A8000" # when the file is a csv, call the api and execute the job 10

    elif file_type == "json":

        url = "http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/12/status?uri=%2F%2Flocalhost%3A8000" # when the file is a json, call the api and execute the job 12

    elif file_type == "xml":

        url = "http://172.18.160.1:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/112/status?uri=%2F%2Flocalhost%3A8000" # when the file is a xml, call the api and execute the job 112

    elif file_type == "xlsx":

        url = "http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/113/status?uri=%2F%2Flocalhost%3A8000" # when the file is a xlsx, call the api and execute the job 113

    else:

        print(f"Ignoring file with unsupported extension: {src_path}")

        return

    s = requests.Session()

    s.auth = ('admin', 'admin')

    try:

        r = s.put(url, headers={'accept': 'application/json', 'Content-Type': 'application/json'}, json={'action': 'start'})

        if r.status_code == 204:

            print("API request sent successfully")

        else:

            print(f"Failed to send API request. Status code: {r.status_code}")

    except requests.exceptions.RequestException as e:

        print(f"Error sending API request: {e}")

if __name__ == '__main__':

    watch = OnMyWatch()

    watch.run()

Complete Script

import time

import requests

from watchdog.observers import Observer

from watchdog.events import FileSystemEventHandler

class OnMyWatch:

    # Set the directory to watch

    watchDirectory = "."  # Replace with your desired directory path

    def __init__(self):

        self.observer = Observer()

    def run(self):

        event_handler = Handler()

        self.observer.schedule(event_handler, self.watchDirectory, recursive=True)

        self.observer.start()

        try:

            while True:

                time.sleep(1)

        except KeyboardInterrupt:

            self.observer.stop()

            print("Observer Stopped")

        self.observer.join()

class Handler(FileSystemEventHandler):

    def on_modified(self, event):

        if not event.is_directory:

            print(f"Detected change: modified - {event.src_path}")

            send_api_request(event.src_path)

    def on_created(self, event):

        if not event.is_directory:

            print(f"Detected file creation: - {event.src_path}")

            send_api_request(event.src_path)

def send_api_request(src_path):

    file_type = src_path.split(".")[-1]  # Get the file extension

    if file_type == "csv":

        url = "http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/10/status?uri=%2F%2Flocalhost%3A8000"

    elif file_type == "json":

        url = "http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/12/status?uri=%2F%2Flocalhost%3A8000"

    elif file_type == "xml":

        url = "http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/112/status?uri=%2F%2Flocalhost%3A8000"

    elif file_type == "xlsx":

        url = "http://localhost:9090/webadmin/denodo-scheduler-admin/public/api/projects/1/jobs/113/status?uri=%2F%2Flocalhost%3A8000"

    else:

        print(f"Ignoring file with unsupported extension: {src_path}")

        return

    s = requests.Session()

    s.auth = ('admin', 'admin')

    try:

        r = s.put(url, headers={'accept': 'application/json', 'Content-Type': 'application/json'}, json={'action': 'start'})

        if r.status_code == 204:

            print("API request sent successfully")

        else:

            print(f"Failed to send API request. Status code: {r.status_code}")

    except requests.exceptions.RequestException as e:

        print(f"Error sending API request: {e}")

if __name__ == '__main__':

    watch = OnMyWatch()

    watch.run()

Script execution output

Once we have the script done, in order to execute it, we have to do the following steps:

  1. Open a terminal.
  2. Go to the directory that you want to monitor.
  3. Activate the python environment.
  4. Execute the script with the command: python3 <script_name>

We can see in the screenshot below the output when the script is executed. If we create a file or modify a file in the directory.

Step 7: Deletion of views and data sources

A Scheduler job for the deletion of base views and data sources can also be created. If a file in the specified directory is deleted, then all the related data sources and base views can be deleted too. To do this also automatically, the base views should be deleted on cascade so that all related views will also be deleted.

For reasons of accuracy, it is recommended to create a view, which will show data sources to be deleted. To do so, use the GET_ELEMENTS() stored procedure to get all data sources that are already created and then use iv_list_of_files view to compare the results. Only the files that have been deleted will be shown.

  1. Open a VQL Sell and run the query below:

CREATE OR REPLACE VIEW iv_elements_to_be_deleted FOLDER = '/02 - integration/03 - files deletion' AS

SELECT name, subtype, path_to_file

FROM get_elements()

WHERE (type = 'datasource' AND

(folder = '/01 - integration/final view/01 - data sources/xml' OR

 folder = '/01 - integration/final view/01 - data sources/delimited files' OR

 folder = '/01 - integration/final view/01 - data sources/json' OR

 folder = '/01 - integration/final view/01 - data sources/excel') AND

 input_database_name = 'byod' AND

 name not in

(SELECT name_for_new_datasource

FROM iv_list_of_files

WHERE (recursive = false AND parentfolder = path_to_file)))

USING PARAMETERS ( path_to_file : text);

Note: The name_for_new_datasource we created earlier in an example is the exact name of a data source, if you decided to name the data source any other way, you have to place an attribute that contains the exact name of the data source. If you have partitioned your files in a different way you need to adapt this section:  (folder ='/datasources/xml' or folder = '/datasources/delimited_files' or folder = '/datasources/json' or folder='/datasources/excel') . Generally speaking, folder = ‘path/to/file’ and ‘path/to/file’ is the exact path from  data source Configuration > Metadata > Folder.

  1. Open the Denodo Scheduler and create a new Job:
  1. Click on Jobs  > Add job > VDP.
  2. Choose the project, name the job and give a description.
  3. Go to the Extraction section.
  4. Choose VDP as Data source.
  5. Set Parameterized query as  DROP DATASOURCE @subtype "@param" CASCADE.
  6. Click on Add source and choose VDP as Data source.
  7. Enter in the Query (non parameterized) field: 

select name, subtype from  byod_setup.iv_iv_elements_to_be_deleted where path_to_file = 'path/to/file'

  1. Click on Add new mapping: enter param as Query parameter, enter name as Source parameter.
  2. Click on Add new mapping: enter subtype as Query parameter, enter subtype as Source parameter.
  3. Click on Save.

Figure 7.1 Deletion job creation


Figure 7.2 Mapping creation

Conclusions

In this document we have seen how we can automatically create the necessary data sources and base views to integrate information coming from files that are added to a repository in a dynamic way.

First, we have created the necessary views to get the information from the files that are being added to the repository and to construct the VQL for the data sources and views that will be used to integrate the files.

Then, we have created Scheduler jobs to periodically query these views and execute the VQL obtained from the to actually create the elements needed to integrate the information coming from those fields. Also, another alternative is presented with the implementation of a Python Script (different versions for Linux and Windows were presented).

The specific example presented in this document can be adapted to fit different scenarios depending on the type of files that will be accessed, how they will be added to the data set and so on.

Questions

Ask a question

You must sign in to ask a question. If you do not have an account, you can register here