Skip to content

DSU Ingestion

The execution of the DSU ingestion Databricks notebook, as the final part of the whole Data Intake Process (see the types in this section), is mainly responsible for the following overview tasks:

  • Registering the Asset.
  • Query optimization.
  • Ingestion in Databricks of optimized tables.

The DSU ingestion notebook works along with several tools described below, responsible for Data Preview tables creation and Metadata services among them, ensuring thus, that the modularization is guaranteed with the following advantages:

  • Ensuring issues detection under a simpler structure.
  • Improving the alignment with native Databricks.
  • Giving a clearer scenario definition.

A particular aspect of the DSU ingestion notebook is that it can be customized and, this way, the ingestion itself. This can be done configuring a different path in the AdditionalProperties field (Entity table) for the Entity to load being used, later, by the ExtractData pipeline. Configuring this in the Entity allows the use of the default (legacy) transfer query or a custom transfer query per Entity in the same pipeline.

DSU ingestion structure

The structure of all the notebooks that incorporate the Tabular Ingestion logic is located in an SDK. After a Sidra's DSU is deployed, the SDK deploys in Databricks and some notebooks call different classes from the SDK logic ingestion. The main script, called TabularIngestion.py, receives some input parameters and activities from the rest of them.

An overview of the notebooks in Databricks is:

  1. DSU Ingestion folder: Includes the CreateTables.py, TabularIngestion.py, and UpdateTables.py main notebooks, which call utility scripts to execute common tasks and functions from the SDK, and retrieve parameters necessary for the ingestion of tables into the Data Storage Unit (units within the data lake). Also, the schema evolution logic is located in UpdateTables.py.

  2. Services folder

    1. DataPreviewService folder:

      • DataPreviewService.py
    2. DSUIngestionService folder:

      • TabularIngestionService.py
      • Models folder:

      • FieldDefinition.py

    3. MetadataService folder:

      • AssetService.py
      • EntityService.py
      • ProviderService.py
      • Models folder:

      • AttributeModel.py

      • EntityModel.py
      • ProviderModel.py
    4. PIIDetection folder:

      • PII_Detection.py
    5. ReaderService folder:

      • ReaderService.py

DSU Ingestion script

The DSU ingestion script executes these steps:

Step 1. Create logger

First of the steps would be to create a logger in order to get the telemetry information from custom events or Azure logs through Azure Event Handler or Azure Log Handler. In this step, error notifications are sent to the Notifications table and Sidra web.

This way, the tracing information related to the ingestion is guaranteed as well as possible messages regarding the process.

Step 2. Process input parameters

The script receives some input parameters calling the Utils script methods in order to identify the correct file to ingest:

  • asset_id as the Asset identifier.
  • asset_uri, which is the path to the file location in Azure Storage.
  • asset_is_full_extract, by default is false, when true, an Entity defined with incremental load requires to do a full load.

The TabularIngestion.py script, executing the methods of the TabularIngestionService.py notebook, gets the whole information regarding that Asset, creating the whole activity to optimize the data for the ingestion in the data lake.

Step 3. Register Asset

The information regarding whether an Asset has been registered as part of a calling pipeline (e.g. RegisterAsset) or not, is in the parameters. If not, the Asset is registered as part of the notebook execution.

Step 4. Set the metadata information

In this step the information regarding the metadata got from the Sidra's API is registered.

Step 5. Truncate old partition from same file

The partition in the Entity's table is dropped if the consolidation mode corresponds to the snapshot mode.

Step 6. Load file into a DataFrame

File reading options (e.g. for CSV files) are configured per Entity. It can be check in its documentation page. The Entity fields SerdeProperties, header and FieldDelimiter in the JSON field additionalProperties.readerOptions, are used to set the options for the ReaderService script and thus, loading the file into a correct DataFrame structure.

Step 7. Create the Staging Tables

Before inserting the data from the file in the table created by the CreateTables.py script, the data is inserted into this temporary table named Staging Table.

During this step, the necessary fields to create the Staging Table are retrieved and inserted thanks to the table and validations staging query.

Step 8. Configure query optimization

Some parameters to optimize the query are setup during this step, for example:

spark.sql('SET hive.exec.dynamic.partition = true')
spark.sql('SET hive.exec.dynamic.partition.mode = nonstrict')
spark.sql('SET hive.optimize.sort.dynamic.partition = false')

Step 9. Create table staging query

A SparkSQL query generated to copy the data to the Staging table is executed.

When the file is read the data is stored in the Staging Table and, after that, the temporary view is created to store the data that will be used to insert into the final table for the Entity.

Step 10. Set options for Consolidation mode

A condition of Consolidation Mode is applied depending on if the tables already exist or not.

Step 11. Update EntityDeltaLoad in incremental loads

This steps updates the delta load record (last delta value) for a specific Entity in an incremental load based on the most recently loaded data.

Step 12. Drop staging tables

At this point, Staging tables are dropped, as they are no longer needed.

Step 13. Monthly Active Rows

The MAR (Monthly Active Rows) measure is calculated as part of the internal Sidra telemetry, based on the selected consolidation mode and incremental load.

Step 14. Generate the Data Catalog

The script calls to the DataPreviewService.py script in this step, resulting in the creation of the preview tables. These tables are used inside the Data Catalog in Sidra Web UI.

This functionality depends on a field in additionalProperties column of the Entity configuration, being this enabled from there. For more information check this page.

The information regarding the ingestion process is ready to be shown in the Data Catalog of the Sidra's UI.

Step 15. Data Quality validations

Checks if the Data Quality Service is enabled and executes it.

Step 16. Anomaly Detection

Executes Anomaly Detection.


Last update: 2024-04-08