Skip to content

DSU Ingestion

2022.R3

The execution of the DSU ingestion Databricks notebook, as the final part of the whole Data Intake Process (see the types in this section), is mainly responsible for the following overview tasks:

  • Registering the Asset.
  • Query optimization.
  • Ingestion in Databricks of optimized tables.

The DSU ingestion notebook works along with several tools described below, responsible for Data Preview tables creation and Metadata services among them, ensuring thus, that the modularization is guaranteed with the following advantages:

  • Ensuring issues detection under a simpler structure.
  • Improving the alignment with native Databricks.
  • Giving a clearer scenario definition.

A particular aspect of the DSU ingestion notebook is that it can be customized and, this way, the ingestion itself. This can be done configuring a different path in the AdditionalProperties field (Entity table) for the Entity to load being used, later, by the ExtractData pipeline. Configuring this in the Entity allows the use of the default (legacy) transfer query or a custom transfer query per Entity in the same pipeline.

DSU ingestion structure

The DSU ingestion structure includes some utility scripts to perform common tasks and functions, called Utils. The main script, called TabularIngestion.py, receives some input parameters and activities from the rest of them.

An overview of the notebooks in Databricks associated to these pipelines activities can be:

  1. CreateTables notebook

  2. DSU Ingestion Tools:

    1. DataPreviewService folder:

      • DataPreviewService.py
    2. DSUIngestionService folder:

      • TabularIngestionService.py
    3. MetadataService folder:

      • AssetService.py
      • EntityService.py
      • ProviderService.py
      • Models folder:

        • AttributeModel.py
        • EntityModel.py
        • ProviderModel.py
    4. ReaderService folder:

      • ReaderService.py
  3. DSU Ingestion main:

    • TabularIngestion.py

DSU Ingestion script

The DSU ingestion script executes these steps:

Step 1. Create logger

First of the steps would be to create a logger in order to get the telemetry information from custom events or Azure logs through Azure Event Handler or Azure Log Handler. In this step, error notifications are sent to the Notifications table and Sidra web.

This way, the tracing information related to the ingestion is guaranteed as well as possible messages regarding the process.

Step 2. Process input parameters

The script receives some input parameters calling the Utils script methods in order to identify the correct file to ingest:

  • Asset id as the Asset identifier.
  • Asset uri, which is the path to the file location in Azure Storage.
  • AssetIsFullExtract, by default is false, when true, an Entity defined with incremental load requires to do a full load.

The TabularIngestion.py script, executing the methods of the TabularIngestionService.py notebook, gets the whole information regarding that Asset, creating the whole activity to optimize the data for the ingestion in the data lake.

Step 3. Register Asset

The information regarding if an Asset has been registered as part of a calling pipeline (e.g. RegisterAsset) or not, is in the parameters. If not, the Asset is registered as part of the notebook execution.

Step 4. Set the metadata information

In this step the information regarding the metadata got from the Sidra's API is registered.

Step 5. Load file into a DataFrame

File reading options (e.g. for CSV files) are configured per Entity. It can be check in its documentation page. The Entity fields SerdeProperties, header and FieldDelimiter in the JSON field additionalProperties.readerOptions, are used to set the options for the ReaderService script and thus, loading the file into a correct DataFrame structure.

Step 6. Truncate old partition from same file

The partition in the Entity's table and in the Validation Error table is dropped if the consolidation mode corresponds to the snapshot mode.

Step 7. Create the Staging Tables

Before inserting the data from the file in the table created by the CreateTables.py script, the data is inserted into this temporary table named Staging Table.

During this step, the necessary fields to create the Staging Table are retrieved and inserted thanks to the table and validations staging query.

Step 8. Configure query optimization

Some parameters to optimize the query are setup during this step, for example:

spark.sql('SET hive.exec.dynamic.partition = true')
spark.sql('SET hive.exec.dynamic.partition.mode = nonstrict')
spark.sql('SET hive.optimize.sort.dynamic.partition = false')

Step 9. Create table and validations staging query

A SparkSQL query generated to copy the data to the Staging table is executed.

When the file is read the data is stored in the Staging Table and, after that, the temporary view is created to store the data that will be used to insert into the final table for the Entity and the validation errors table.

At the same time as the SparkSQL query copies the data to the staging table, it performs additional validations. See below on step 11 for validation errors calculation.

Step 10. Set options for Consolidation mode

A condition of Consolidation Mode is applied depending on if the tables already exist or not.

Step 11. Insert into validation errors table

The Validation Errors table is filled with data from the Staging table. In this case, only the rows which have errors are inserted. The Validation Errors calculation logic calculates when there is any validation error in each of the records of the tables to ingest. The validations are performed against constraint violations of the Attributes metadata.

The mode of data consolidation in Databricks can be configured in the AdditionalProperties column of the Entity table. Inside this JSON, there is a field called consolidationMode. The possible values are: Merge or Snapshot. Depending on this value and also whether the tables already exist of not, the DSU ingestion script will customize the statement to insert/update the data in the DSU tables.

The Attributes table contains a list of all the metadata about each of the Attributes to be loaded into Sidra platform. More specifically, there are two Attributes, called isNullable and MaxLength, which specify whether the value can be NULL, and the maximum size of the column, respectively.

The logic to calculate ValidationErrors table is the following:

  • The field HasErrors in the validation errors table in the data lake is set to true if any of these constraionts violations happen for any record.
  • The field ViolatedContraints in the validation errors table in the data lake contains a description of the constraints that have been violated during the load of the Asset.

More info about the validation errors table can be found in the create tables script documentation page.

Step 12. Drop staging tables

At this point, Staging tables are dropped, as they are no longer needed.

Step 13. Generate the Data Catalog

The script calls to the DataPreviewService.py script in this step, resulting in the creation of the preview tables. These tables are used inside the Data Catalog in Sidra Web UI.

This functionality depends on a field in additionalProperties column of the Entity configuration, being this enabled from there. For more information check this page.

The information regarding the ingestion process is ready to be shown in the Data Catalog of the Sidra's UI.


Last update: 2023-07-31