Skip to content

ADF Pipelines for Asset Ingestion

The Asset ingestion into Databricks is an automated process that starts from the raw copy of the Asset in the Blob Storage in the DSU, and ends with an optimized copy of the information in the Databricks DSU (Azure Data Lake Storage).

In order to automate the process, the ingestion is performed by an Azure Data Factory (ADF) pipeline.

The specific pipeline to deploy and run is selected based on the configuration of the Entity to which the Asset is associated and the kind of Data Intake.

File ingestion pipelines then include some scripts generation, which are created with the parametrization of the Entity metadata details, and then executed each time the data is actually being transferred.

Step-by-step

Asset flow into Sidra's platform via landing zone

For a detailed explanation of a complete Data Intake and File ingestion using the landing zone, please continue in this page .

Metadata Extraction and Ingestion Pipelines

These accelerators available for some sources, like SQL databases, will be used for two main purposes internally in Sidra:

  • Create and populate the Sidra Core metadata structures (Entities and Attributes), from the information of the different tables and columns in the source database system. This would encompass the above actions for creating Entities and Attributes.
  • Prepare for the actual file ingestion into the Data Storage Unit by executing the create tables and DSU ingestion scripts, including all the steps for storing the optimized data in the data lake storage.

Once the metadata extractor pipeline has been created, it needs to be executed. This execution could take a while (from 5 to 15 minutes) depending on the load of Data Factory and the status and load of the associated Databricks cluster. Sidra exposes API endpoints as well to manually execute the SQL Metadata extractor pipeline.

If metadata extraction pipelines are not available for the data source, then the process for creating the Asset metadata extraction will need to be manually triggered, and executed as described in the above links (Create Entities, Create Attributes, etc.).

Configure the data extraction and ingestion pipeline

Sidra incorporates different out of the box pipelines and pipeline templates to orchestrate data extraction from source systems.

Data extraction pipeline templates create pipelines associated with the Provider in order to orchestrate the data extraction from the source.

The data extraction and ingestion pipelines to use vary depending on the type of data source.

For some types of data sources, like databases, Sidra comes with data extractor pipeline templates, like the BatchExtractAndIntakeSQL. Pipelines created out of this template perform transparently all actions from the actual extraction or movement of data from the source system, to the execution of the actual file ingestion into the Data Storage Unit. Steps 4 and 5 in this document describe how to deploy and execute these pipelines.

The specific pipeline to deploy and run is selected based on the configuration of the Entity to which the Asset is associated. There is an Entity-Pipeline association that needs to happen in order to associate an Entity with a data extraction pipeline.

Deploy the data extraction and data ingestion pipeline

Once the different pipelines for data extraction have been created (for example, any of the ExtractFrom{data source name} these pipelines need to be deployed into Azure Data Factory.

Data extraction pipelines have a pre-requirement of having all the metadata about the sources configured. In case there is a metadata extractor pipeline available, this pipeline needs to be deployed and executed first before proceeding with the deployment and execution of the data extractor pipeline. Sidra API incorporates different methods for deploying ADF pipelines.

Deploying an ADF pipeline can take several minutes depending on the status of the environment. Processing cluster resources (e.g. Databricks) are required in order to perform this operation in Sidra Core.

Ingestion pipeline selection

Knowing the Entity to which the Asset is associated, the selection of the ADF pipeline follows these automated steps:

  1. Retrieve from the Sidra Core metadata tables the relationship between pipelines and Entities. This relation is configured in the EntityPipeline table in the metadata database. This information contains all the pipelines associated to the Entity.
  2. From the previous list of pipelines, the first pipeline for ADF of the type LoadRegisteredAsset is chosen. The metadata database stores both ADF and Azure Search pipelines. In this selection process, only Azure Data Factory pipelines with type LoadRegisteredAsset are considered. The type of the pipelines is in table PipelineTemplateType.
  3. If step 2 does not produce any selection, choose the pipeline with the same name as the Entity.
  4. If step 3 does not produce any selection, choose the default pipeline configured in the platform for file ingestion.

The default pipeline for file ingestion can be configured in the Management.Configuration table using GenericPipelineName as the key and the name of the pipeline as the value, e.g. FileIngestionDatabricks.

More information about the platform configuration tables can be found in the Management metadata section.

File ingestion pipelines

The file ingestion is the process that reads the raw copy of the file and persists the information in an optimized format in the DSU, after executing some initial optimizations. Every file ingestion pipeline can have its own particularities, but all of them work in a similar way.

These pipelines execute two different Spark scripts:

  • The Table creation script creates the necessary database and tables in the Databricks cluster. More detailed information about this script can be found in Table creation section.
  • The DSU Ingestion notebooks read the raw copy of the Asset and inserts the information in the tables created by the previous script. More information about this script can be found in DSU ingestion section section.

Both scripts are executed based on the Entity's metadata and can be stored in Databricks File System (DBFS) -so they can be executed by the Databricks cluster- or in an Azure Storage account.

The scripts will be executed only if the following set of conditions are satisfied:

  • The Entity's metadata has been updated since the last time the scripts were executed.
  • The Entity's metadata flag ReCreateTableOnDeployment is active.

Pipelines per DI type

Depending on the type of Data Intake (DI) carried out, different pipelines will take part in the process.

1. Pipelines in DI via landing zone

Registration pipeline

  • RegisterAsset. This pipeline is responsible for creating the Asset into the storage account specific of the DSU and launching the ingestion process.

    The Core Storage Blob Created configured in the Data Factory detects a new file in the landing zone and executes RegisterAsset. The trigger executes the pipeline covering the parameters of the pipeline folderPath and fileName with the information of the new file detected.

    Afterwards, there will be an activity aimed to do the file encryption with an independent raw container and a specific Databricks notebook.

    A web activity is executed next, which makes a call to Sidra API endpoint loadasset, taking care of the Provider, landing zone and DSU identification, as well as the automatic movement of the folders created (with the file inside), from the landing zone to a new Azure Storage container named as the Provider and located in the same storage account (stage storage account). This endpoint will insert the information (name, date, entity, path to the landing...) of the file into the Assets table.

    Finally, the RegisterAsset pipeline will call the ingestion pipeline launching the ingestion process and it will be responsible for the deletion of the file in the landing zone, now correctly registered and ingested.

Ingestion pipeline

  • FileIngestionDatabricks. This pipeline will be responsible for invoking the Sidra API to ingest the file in the Data Storage Unit (DSU). The parameters of the pipeline are filled in with information retrieved from the Core database about the file (Asset) and the Entity associated to it.

    • PrepareDataLoad. This pipeline creates the tables and checks if the additionalProperties.TransferQuery field in the Entity configuration has been filled or not:
      • If not, the FileIngestionDatabricks pipeline will check in the next activity the status of the field UseLegacyTransferQuery:
        • If it is false or NULL, the DSU ingestion script is executed.
        • If it is true, the existing Transfer Query script in the PrepareDataLoad pipeline is executed.
      • When the additionalProperties.TransferQuery is filled, then this custom transfer query script is run.

    As well, Anomaly Detection enabling is checked in this pipeline through the paramater additionalProperties.piiDetectionEnabled.

2. Pipelines in DI with document indexing

Registration pipeline

In this case, a Hangfire job background task is listening a indexlanding container and running very 5 minutes. After detecting a binary file on it, an AzureSearchIndex pipeline (non-ADF pipeline) based on the Pipeline Template AzureSearchIndexerSemanticSearch, which will be executed and responsible for:

  1. Register the file blob as Asset.
  2. Move the Asset blob from the indexlanding container to its final destination.
  3. Invoke the Azure Search Indexer to start processing the batch of blobs.

Ingestion

After the Asset indexing is done by an Azure Search skillset which will process, extract knowledge and index the file; the projections extracted are stored in the knowledgestore container. The ingestion in Databricks will be done by the execution of an specific notebook (AzureSearchKnowledgeStoreIngestion).

3. Pipelines in Data Intake Process (DIP) via connector plugins

  ≥ 2022.R3 

Extraction pipelines

  • ExtractionOrchestrator is responsible for calling a DataExtraction pipeline which gets the Entities information and create the Assets with no Data, orchestrating the extraction.

  • ExtractMetadata pipeline contains an activity for calling the PrepareDSUforDIP pipeline by the DIP id.

Ingestion pipelines

  • PrepareDSUforDIP pipeline will filter by DIP instead of by Provider. It will be responsible for creating the database in the Databricks cluster and the tables by calling the CreateTable pipeline and this, executing the CreateTables.py notebook in Databricks. As well, the PrepareDSUforDIP pipeline will check the Schema Evolution.

  • CreateTable pipeline will create the table and validation error table for the Entity of the Asset being ingested by executing the CreateTables.py notebook.

  • ExtractData pipeline counts with a ForEach activity where, in the Ingestion condition, runs the DSU ingestion notebook. This reads the raw copy of the Asset and inserts the information in the tables created by the CreateTables.py script.

Then the DSU Ingestion script will act as a template used for all the Entities to ingest, using the information stored in the Core database about these Entities. Its location is passed through the ExtractData pipeline by the Ingest activity, executing when true the DSU ingestion notebook by a Databricks Python activity.


Sidra Ideas Portal


Last update: 2022-10-19
Back to top