Skip to content

Sidra Client pipelines

The goal of this document is to explain in more detail concepts of Client Application pipelines.

Further concepts on Client Application pipelines

The Pipeline table in Client Application metadata database has some additional fields regarding the metadata tables in Core, as described in the Concepts section.

PipelineSyncBehaviour table

PipelineSyncBehaviour. This is a table that defines the different type of behaviour that is going to be executed from the Sync webjob perspective. Sync webjob is the responsible to synchronize metadata and execute the pipelines associated with the Entities. Table Pipeline contains a field pointing to this behavior.

When a pipeline is being executed, the Sync webjob will provide several parameters to the pipeline. Depending on the sync behaviour, the pipelines will be:

  • Ignored: Pipeline will be ignored by the Sync webjob, so probably that pipeline should be executed explicitly using a Trigger.
  • LoadAllNewDates: All completed date Assets (see below on complete set of data) will be included to be extracted by Sync webjob execution of the pipeline.
  • LoadFirstNewDate: Only the oldest set of Asset data completed will be included to be extracted by Sync webjob execution of the pipeline.
  • LoadLastNewDate: Only the newer set of Asset data completed will be included to be extracted by Sync webjob execution of the pipeline.
  • LoadPendingUpToValidDate: Sync webjob will instruct the pipeline to load all pending Assets of any date up to the las completed date.
  • LoadUpToLastValidDate: This Sync behavior checks those Assets that have not been added yet for a specific pipeline, but which are in the Assets table. These Assets will therefore be considered for the next Client Application pipeline load. This SynBehavior works similarly to type LoadPendingUpToValidDate, but checking the ExtractPipelineExecution to work with multiple pipelines. This is the mandatory SyncBehavior for loading an entity with several pipelines. This is also the recommended SyncBehavior for all new load pipelines. This Sync behaviour will retry not finished Assets by pipeline a number of times by default (3), this can be changed through App Service configuration setting up the Sync.AssetRetries key.

Note on PipelineSyncBehaviour table

This last behavior LoadPendingUpToValidDate is required in order to avoid situations like the following:

If there is an error in the data intake process (Core ingestion, incremental loads), some of the Entities are loaded but the failed Entities are not loaded. The overall status of the job is Succeeded, because this way we do not block the iterative batch process in the pipeline to load all Entities. When this issue happens, e.g. if there are a set of three Entities that are marked as Mandatory on the Client Application side (table EntityPipeline), the Sync job will not trigger the pipeline until all three Assets have been generated, which could take several days (e.g. three days, assuming that on day 3 we finally have the three mandatory Assets). Sync behavior LoadAllNewDates is only are capable of loading all completed date Assets on the date of execution of the Sync webjob, leaving the already loaded increments in Core without loading on the Client Application staging tables. For some business logic of Client Applications, we may need to know exactly which days there were Assets that were not processed, in order to retrieve this missing data by the Client Application.

Sync behavior LoadPendingUpToValidDate loads any Asset BEFORE UP TO the most recent day in which there are available Assets for every mandatory Entity. In the example of the three Assets above, this means: any Assets (deltas) available on day 1 and any Assets (deltas) available on day 2.

Also, starting with Release 2022.R2, new pipeline execution status (Sidra.ExtractPipelineExecution) has been implemented, which tracks the execution status per Asset per Entity and Client Application pipeline. This Sync behavior checks those Assets that have not been added yet for a specific pipeline, but which are in the Assets table. These Assets will therefore be considered for the next Client Application pipeline load. If the Asset status for these Assets are in state 3 (ImportedFromDataStorageUnit), then this status will be ignored, because now the table will refer to the ExtractPipelineExecution status. This table is being filled by the corresponding status (extracting, staging, finished, error) in by an activity in the Client Application pipeline. These Client Application pipeline templates use this PipelineSyncBehavior. This SynBehavior works similarly to type LoadPendingUpToValidDate, but checking the ExtractPipelineExecution to work with multiple pipelines. This is the mandatory PipelineSyncBehavior to configure if it is required to execute multiple Client App pipelines. In this behavior, Sync does not retry not finished Assets by pipeline (default threshold = 3), it can be changed through App Service configuration setting up the Sync.AssetRetries key)

ExecutionParameters field

  • ExecutionParameters. This is a field in the Pipeline table in Client Applications. This field is used for including values in the parameters of the pipeline when this pipeline is executed by the Sync webjob.

Sync pipeline requirements

All the pipelines called by the Sync webjob (all except with behaviour Ignored) require using a PipelineTemplate with some required parameters, apart from the ExecutionParameters.

The values of the pipeline parameters will be populated automatically by the Sync webjob, and you can see them in the Template field from the PipelineTemplate table:

  • storageName: The storage name used to drop the content. This storage account is created by the deployment of the Client Application.
  • storageToken: The SAS container token of the storage used.
  • containerName: The storage container name where the content will be dropped.
  • queryParameters: Some parameters are populated here by the Sync webjob, which include the Assets that are going to be extracted per Entity from the Data Storage Unit.
  • overwrite: By derault this value is True. This parameter can be overriden in the ExecutionParameters field for the pipeline to indicate whether the content will be either appended, or cleaned and dropped.

From the point of view of the PipelineTemplate, an example of the section JSON (Template field) with the parameters will be the following (minimun required):

 "parameters": {
    "storageName": {
        "type": "string"
    },
    "storageToken": {
        "type": "string"
    },
    "containerName": {
        "type": "string"
    },
    "queryParameters": {
        "type": "array"
    },
    "overwrite": {
        "type": "string",
        "defaultValue": "true"
    }
}

There may be more parameters, depending on the PipelineTemplate.

Complete set of data

When an Entity is defined, there is a field named FilesPerDrop (further info in metadata description, which refers to how many files are going to be loaded for the same date for that given Entity.

For exampple, let's suppose that for a given Entity, every drop of data in Sidra Core requires at least three different files to be dropped to the landing zone and ingested into Sidra.

This means that, for considering the data to be complete there should be at least the minimum number of Assets available for the selected Entities, three Assets following the example.

In summary, the conditions to fulfill a complete set of data for a pipeline to run are the following:

  • The Entities are associated with the pipeline (through the Entity-pipeline association, see below on EntityPipeline information).
  • The EntityPipeline relationship has been tagged as Mandatory. See the mandatory Assets section for detailed description of the Mandatory Assets feature.
  • The AssetDate fields for each Asset for all the Entities associated with the pipeline meet the condition to be between the StartValidDate and EndValidDate for the Entity.

The granurality of the complete set of data is based on the field AssetDate and depends on the Sync webjob behavior (see above in this page).

EntityPipeline information

The same extraction pipeline can be used to extract several Assets from different Entities. That can be configured by associating the same pipeline to several Entities using the EntityPipeline table.

Column Description
IdPipeline [Required] Identifier of the pipeline
IdEntity [Required] Identifier of the related Entity
IsMandatory [Required] If true, Assets associated with the IdEntity need to be as a complete set to run the pipeline. If false, complete data calculation will not take into account this IdEntity. By default, TRUE.
PipelineExecutionProperties [Optional] This a place to put a JSON that could be used in the pipeline for different purposes. By default, NULL, empty or "{}".

PipelineExecutionProperties column

For version 1.12 onwards, the pipelineExecutionProperties column in the EntityPipeline table of Client Applications, has been used to add a ConsolidationMode as an Overwrite parameter which will take action before any data load. A Databricks notebook will be in charge of doing the following functions while checking the parameter status:

  • When ConsolidationMode is set up as Overwrite, the entire table will be overwritten.
  • When ConsolidationMode is set up as Merge (default), the data will be merged if there is a Primary Key, otherwise the data will be appended.
  • When ConsolidationMode is set up as Append, the data is appended.
  • When other value is set up in this parameter, it will throw an error stating that the ConsolidationMode XXX is not valid.
  • When the parameter does not exist or it is empty, it will take the default value of Merge.

Extraction pipelines execution

After understanding all the above concepts for Client Application pipelines behavior, as well as the ones in the Concepts section, we can summarize with the following:

The extraction pipelines will be launched by the Sync webjob once it has checked the following conditions:

  1. The PipelineSyncBehaviour is not Ignored.
  2. The pipeline is not associated to a Provider through an Entity that is marked as disabled.
  3. The pipeline is not being executed.
  4. There are Assets ready to be exported i.e., the AssetStatus is MovedToDataLake or ImportingFromDataLake.
  5. If the relationhip between the pipeline and the Entity contais the flag IsMandatory= true, and the date of the Asset to be exported are between the StartValidDate and EndValidDate of the Entity.
  6. There is at least as many Assets ready to be exported as the number of Assets stored in the Entity field FilesPerDrop.

Available pipeline templates

There are several PipelineTemplates provided by Sidra which can be used to accelerate the data movement between the DSU and the Client Application. The pipelines are provided as-is and with the only purpose that are defined for. The platform is open to create your own pipelines with your own activites, as the same that happens in Core but here your pipeline should have the defined parameters explained in previous points to allow that can be raised by Sync webjob.

Current pipeline templates available are:

ItemId Purpose
BF367329-ABE2-42CD-BF4C-8461596B961C Execute an extraction from DSU to the client storage and staging tables, scaling up/down the selected database during the process.
19C95A0E-3909-4299-AEE1-15604819E2B0 Execute an extraction from DSU to the Client Appplication storage and staging tables.
F5170307-D08D-4F92-A9C9-92B30B9B3FF1 Extract Assets to the storage account and execute a notebook.
202BDE4E-1465-4461-9A86-492DBFFF9312 Execute an extraction from DSU to the Client Application storage, Databricks and staging tables.

SQL Staging Table generation

By default, all the pipeline templates that are dropping the content to an SQL database, the destination staging table is auto-created by the data extraction pipeline, via the execution of a stored procedure provided by Sidra.

Sidra will create the table with the same Attributes defined in the Entity, and with the same order and types except for SQL_VARIANT, VARBINARY, GEOGRAPHY and UNIQUEIDENFITIER, which will be converted as NVARCHAR(MAX) for the first of these three types, and VARCHAR(16) for UNIQUEIDENTIFIER. This is into guarantee the compatibily between the parquet files and the Azure Data Factory Copy activity.

As the tables are recreated in every load, they don't include keys or indexes. These changes need be explicitly included inside the orchestrator stored procedure called by the pipeline. This stored procedure also needs to ensure that the tables are optimimized for the queries that will be executed.

Default pipeline template for extraction with database scale

There is a default pipeline template with the ItemId 19C95A0E-3909-4299-AEE1-15604819E2B0 that can be used to extract content.

This pipeline will drop the content to the Client Application storage account and to staging tables in the SQL database created in the Client Application as well. The pipeline also incorporates a further step to execute a stored procedure on the data in the staging tables, so that it can be incorporated into the productiont tables.

The pipeline invokes Sidra API to perform the extraction of the content to the Client Application storage. The content will be dropped in the storage account with .parquet format. The content is also copied to the staging tables via an Azure Data Factory copy activity.

This pipeline template performs the following steps:

  • Get the list of Entities to export from the Data Lake.
  • For those Entities to copy to staging tables, an API call to /query Sidra Core API endpoint will copy the data from the DSU to the client storage (raw).
  • A copy data activity is called to copy that data from the raw storage to the client DB staging tables. The staging tables will have been previously automatically created
  • The orchestrator SQL stored procedure for the custom business logic to create the elaborated data models is called.
  • Finally, the Asset status is changed to ImportedFromDataStorageUnit, or status 3, meaning that the Asset has been imported from the Data Storage Unit into the client database. Otherwise, status will be 0 (error).

The template includes the parameters described before to work with the Sync webjob, and it provides some mandatory parameters that are required to populate in the ExecutionParameters:

  • storedProcedureName: The SQL stored procedure that is going to be invoked after all the Entities have dropped the content. This stored procedure will need to be created before deploying a pipeline from this template.

For example following ExecutionParameters section:

{
    "storedProcedureName": "[schema].[StoredProcedure]"
}

In addition, another stored procedure is called within this pipeline that inserts the tracked execution values in the ExtractPipelineExecution table.

The new pipeline will need to be created through Sidra API from this template. Once this pipeline instance is created in the Sidra Client Application database, under Sidra.Pipeline table, there is a webjob that is responsible for creating the underlying infrastructure in Azure Data Factory.

Default pipeline template for extraction

There is a default pipeline template with the ItemId 19C95A0E-3909-4299-AEE1-15604819E2B0 that can be used to extract content.

This pipeline will drop the content to the Client Application storage account and to staging tables in the SQL database created in the Client Application as well. The pipeline also incorporates a further step to execute a stored procedure on the data in the staging tables, so that it can be incorporated into the productiont tables.

The pipeline invokes Sidra API to perform the extraction of the content to the Client Application storage. The content will be dropped in the storage account with .parquet format. The content is also copied to the staging tables via an Azure Data Factory copy activity.

The template includes the parameters described before to work with the Sync webjob, and it provides some mandatory parameters that are required to populate in the ExecutionParameters:

  • storedProcedureName: The stored procedure that is going to be invoked after all the Entities have dropped the content. This stored procedure will need to be created before deploying a pipeline from this template.

For example following ExecutionParameters section:

{
    "storedProcedureName": "[schema].[StoredProcedure]"
}

In addition, another stored procedure is called within this pipeline that inserts the tracked execution values in the ExtractPipelineExecution table.

The new pipeline will need to be created through Sidra API from this template. Once this pipeline instance is created in the Sidra Client Application database, under Sidra.Pipeline table, there is a webjob that is responsible for creating the underlying infrastructure in Azure Data Factory.

Pipeline template for a Notebook execution (Datalabs)

This is the pipeline template with ItemId F5170307-D08D-4F92-A9C9-92B30B9B3FF1 that can be used to extract content and execute a Databricks Notebook once the raw content is stored in the storage account. This can be used with DataLab Client Application template.

This pipeline template performs the following steps:

If the notebook execution is succedded, the assets involved will be update the status to 2; otherwise to 0.

This pipeline template performs the following steps:

  • Get the list of Entities to export from the Data Lake.
  • For those Entities to copy to storage, an API call to /query Sidra Core API endpoint will copy the data from the DSU to the client storage (raw).
  • An entry point notebook, or orchestration notebook, whose name is passed as a parameter to the pipeline, is executed. Here the user will be able to add any custom exploratory data analysis on the data that is in the client raw storage.
  • Finally, the Asset status is changed to ImportedFromDataStorageUnit, or status 3, meaning that the Asset has been imported from the Data Storage Unit into the client database. Otherwise, status will be 0 (error).

The parameters required for using this pipeline are:

  • orchestratorNotebookPath: The path of the Notebook to execute. The Notebook should be previously created and uploaded into the Databricks instance.
  • entityName: A parameter to provide the name of the Entity (or empty).
  • pipelineName: A parameter to provide the name of the pipeline which is being executed (or empty).

Both entityName and pipelineName are parameters that are going to provided to the Notebook, but depending on the Notebook they can be useful or not.

For example the ExecutionParameters section will be:

{
    "orchestratorNotebookPath": "/Shared/MyNotebook",
    "entityName": "myentity",
    "pipelineName": "mypipeline"
}

Pipeline template for extraction to storage and execute Databricks notebook

This is the pipeline template with ItemId 202BDE4E-1465-4461-9A86-492DBFFF9312, that can be used by the Basic SQL and Databricks Client Application template.

This pipeline template performs the following steps:

  • Step 1: The list of Entities to query and export from the Data Lake (DSU) is retrieved. This list will also be used for creating the tables in the Client Application Databricks.
  • Step 2: If there are any Entities requiring to create tables in the Client Application Databricks, a notebook called Create Tables in the Client Application Databricks is executed.
  • Step 3: For each Entity to export, the data is first is copied from the DSU into the Client Application storage (in raw format), through a call to /query Sidra Core API endpoint.
  • Step 4: For each Entity to export, the data is copied to the Client Application Databricks as delta tables (a notebook called LoadDataInDatabricks is executed for this data operation).
  • Step 5: An entry point notebook, or orchestration notebook, whose name is passed as a parameter to the pipeline, is executed. Here the user is able to add any custom querying logic for aggregating and correlating any data that has been made available as Databricks delta tables. The queries to execute in this notebook will be read from the configured data on the StagingConfiguration Client Application table.
  • Step 6: Another Databricks notebook, called ExtractToStagingTables, is executed to load the data into the SQL staging tables, therefore making it available for further processing.
  • Step 7: The orchestrator SQL stored procedure is called. This stored procedure executes business logic to read from the staging tables and create the final production data models.
  • Step 8: Finally, the Asset status is changed to ImportedFromDataStorageUnit, or status 3, meaning that the Asset has been imported from the Data Storage Unit into the Client Application database.

The parameters for using this pipeline are:

  • storedProcedureName: The SQL stored procedure that is going to be invoked after all the Entities have dropped the content into the staging tables. This stored procedure will need to be created before deploying a pipeline from this pipeline template. The default value is [Staging].[Orchestrator].
  • notebookOrchestrator: The path of the Notebook to execute the configured queries in the StagingConfiguration table . The Notebook should be previously created and uploaded into the Databricks instance.

For example, the ExecutionParameters section will be:

{
    "notebookOrchestrator": "/Shared/MyNotebook",
    "storedProcedureName": "[staging].[Orchestrator]" 
}

Last update: 2022-05-17
Back to top