The execution of the DSU ingestion Databricks notebook, as the final part of the whole Data Intake Process (see the types in this section), is mainly responsible for the following overview tasks:
- Registering the Asset.
- Query optimization.
- Ingestion in Databricks of optimized tables.
The DSU ingestion notebook works along with several tools described below, responsible for Data Preview tables creation and Metadata services among them, ensuring thus, that the modularization is guaranteed with the following advantages:
- Ensuring issues detection under a simpler structure.
- Improving the alignment with native Databricks.
- Giving a clearer scenario definition.
A particular aspect of the DSU ingestion notebook is that it can be customized and, this way, the ingestion itself. This can be done configuring a different path in the
AdditionalProperties field (Entity table) for the Entity to load being used, later, by the
ExtractData pipeline. Configuring this in the Entity allows the use of the default (legacy) transfer query or a custom transfer query per Entity in the same pipeline.
- For existing pipelines created as part of Data Intake Processes via connector plugins before release 2022.R3, the script used for DSU ingestion will continue to be the legacy Transfer Query script. This continues requiring another structure in the pipelines, to autogenerate the Transfer Query script per Entity, along with the tables creation. More information can be checked in the next Transfer Query documentation page.
- From 2022.R3 version onwards, the connector plugins will create pipelines that by default use the new DSU ingestion script. This setting can be overriden to use the legacy transfer query by changing the configuration parameters.
DSU ingestion structure¶
The DSU ingestion structure includes some utility scripts to perform common tasks and functions, called Utils. The main script, called
TabularIngestion.py, receives some input parameters and activities from the rest of them.
An overview of the notebooks in Databricks associated to these pipelines activities can be:
DSU Ingestion Tools:
DSU Ingestion main:
DSU Ingestion script¶
The DSU ingestion script executes these steps:
Step 1. Create logger¶
First of the steps would be to create a logger in order to get the telemetry information from custom events or Azure logs through Azure Event Handler or Azure Log Handler. In this step, error notifications are sent to the Notifications table and Sidra web.
This way, the tracing information related to the ingestion is guaranteed as well as possible messages regarding the process.
Step 2. Process input parameters¶
The script receives some input parameters calling the
Utils script methods in order to identify the correct file to ingest:
Asset idas the Asset identifier.
Asset uri, which is the path to the file location in Azure Storage.
AssetIsFullExtract, by default is
true, an Entity defined with incremental load requires to do a full load.
TabularIngestion.py script, executing the methods of the
TabularIngestionService.py notebook, gets the whole information regarding that Asset, creating the whole activity to optimize the data for the ingestion in the data lake.
Step 3. Register Asset¶
The information regarding if an Asset has been registered as part of a calling pipeline (e.g. RegisterAsset) or not, is in the parameters. If not, the Asset is registered as part of the notebook execution.
Step 4. Set the metadata information¶
In this step the information regarding the metadata got from the Sidra's API is registered.
Step 5. Load file into a DataFrame¶
File reading options (e.g. for CSV files) are configured per Entity. It can be check in its documentation page. The Entity fields
FieldDelimiter in the JSON field
additionalProperties.readerOptions, are used to set the options for the
ReaderService script and thus, loading the file into a correct DataFrame structure.
Step 6. Truncate old partition from same file¶
The partition in the Entity's table and in the Validation Error table is dropped if the consolidation mode corresponds to the
Step 7. Create the Staging Tables¶
Before inserting the data from the file in the table created by the
CreateTables.py script, the data is inserted into this temporary table named Staging Table.
During this step, the necessary fields to create the Staging Table are retrieved and inserted thanks to the table and validations staging query.
Step 8. Configure query optimization¶
Some parameters to optimize the query are setup during this step, for example:
Step 9. Create table and validations staging query¶
A SparkSQL query generated to copy the data to the Staging table is executed.
When the file is read the data is stored in the Staging Table and, after that, the temporary view is created to store the data that will be used to insert into the final table for the Entity and the validation errors table.
At the same time as the SparkSQL query copies the data to the staging table, it performs additional validations. See below on step 11 for validation errors calculation.
Step 10. Set options for Consolidation mode¶
A condition of Consolidation Mode is applied depending on if the tables already exist or not.
Step 11. Insert into validation errors table¶
The Validation Errors table is filled with data from the Staging table. In this case, only the rows which have errors are inserted. The Validation Errors calculation logic calculates when there is any validation error in each of the records of the tables to ingest. The validations are performed against constraint violations of the Attributes metadata.
The mode of data consolidation in Databricks can be configured in the
AdditionalProperties column of the Entity table. Inside this JSON, there is a field called
consolidationMode. The possible values are: Merge or Snapshot. Depending on this value and also whether the tables already exist of not, the DSU ingestion script will customize the statement to insert/update the data in the DSU tables.
The Attributes table contains a list of all the metadata about each of the Attributes to be loaded into Sidra platform.
More specifically, there are two Attributes, called
MaxLength, which specify whether the value can be NULL, and the maximum size of the column, respectively.
The logic to calculate ValidationErrors table is the following:
- The field
HasErrorsin the validation errors table in the data lake is set to
trueif any of these constraionts violations happen for any record.
- The field
ViolatedContraintsin the validation errors table in the data lake contains a description of the constraints that have been violated during the load of the Asset.
More info about the validation errors table can be found in the create tables script documentation page.
Step 12. Drop staging tables¶
At this point, Staging tables are dropped, as they are no longer needed.
Step 13. Generate the Data Catalog¶
The script calls to the
DataPreviewService.py script in this step, resulting in the creation of the preview tables. These tables are used inside the Data Catalog in Sidra Web UI.
This functionality depends on a field in
additionalProperties column of the Entity configuration, being this enabled from there. For more information check this page.
The information regarding the ingestion process is ready to be shown in the Data Catalog of the Sidra's UI.