Skip to content

Sidra Client pipelines

The goal of this document is to explain in more detail concepts of Client Application pipelines.

PipelineSyncBehaviour table and Pipeline table

PipelineSyncBehaviour is a table that defines the different type of behaviour that is going to be executed from the Sync webjob perspective. Sync webjob is the responsible to synchronize metadata and execute the pipelines associated with the Entities (see Role of Sync and DatabaseBuilder webjobs in Client Applications).

Table Pipeline contains a field pointing to the Sync behaviour. The field ExecutionParameters is used for including values in the parameters of the pipeline when this pipeline is executed by the Sync webjob.

When a pipeline is being executed, the Sync webjob will provide several parameters to the pipeline. Depending on the sync behaviour, the pipelines will be:

  • Ignored: Pipeline will be ignored by the Sync webjob, so probably that pipeline should be executed explicitly using a Trigger.
  • LoadAllNewDates: All completed date Assets (see below on complete set of data) will be included to be extracted by Sync webjob execution of the pipeline.
  • LoadFirstNewDate: Only the oldest set of Asset data completed will be included to be extracted by Sync webjob execution of the pipeline.
  • LoadLastNewDate: Only the newer set of Asset data completed will be included to be extracted by Sync webjob execution of the pipeline.
  • LoadPendingUpToValidDate: Sync webjob will instruct the pipeline to load all pending Assets of any date up to the las completed date.
  • LoadUpToLastValidDate: For version 1.12 (2022.R2) onwards, this Sync behavior checks those Assets that have not been added yet for a specific pipeline, but which are in the Assets table. These Assets will therefore be considered for the next Client Application pipeline load. This Sync behavior works similarly to type LoadPendingUpToValidDate, but checking the ExtractPipelineExecution to work with multiple pipelines. This is the mandatory Sync behavior for loading an Entity with several pipelines. This is also the recommended Sync behavior for all new load pipelines. This Sync behaviour will retry not finished Assets by pipeline for a number of times by default (3), this can be changed through AppService configuration setting up the Sync.AssetRetries key.

Note on PipelineSyncBehaviour table

The behaviors LoadPendingUpToValidDate or LoadUpToLastValidDate (this last one only since version 2022.R2) are required in order to avoid situations like the following:

If there is an error in the Data Intake Process (Core ingestion, incremental loads), some of the Entities are loaded but the failed Entities are not loaded. The overall status of the job is Succeeded, because this way we do not block the iterative batch process in the pipeline to load all Entities. When this issue happens, e.g. if there are a set of three Entities that are marked as Mandatory on the Client Application side (table EntityPipeline), the Sync job will not trigger the pipeline until all three Assets have been generated, which could take several days (e.g. three days, assuming that on day 3 we finally have the three mandatory Assets).

Sync behavior LoadAllNewDates is only are capable of loading all completed date Assets on the date of execution of the Sync webjob, leaving the already loaded increments in Core without loading on the Client Application staging tables. For some business logic of Client Applications, we may need to know exactly which days there were Assets that were not processed, in order to retrieve this missing data by the Client Application.

Sync behaviors LoadPendingUpToValidDate or LoadUpToLastValidDate loads any Asset BEFORE UP TO the most recent day in which there are available Assets for every mandatory Entity.

In the example of the three Assets above, this means: any Assets (deltas) available on day 1 and any Assets (deltas) available on day 2.

Note on multiple pipelines per Entity

Also, from Release 2022.R2, new pipeline execution status (Sidra.ExtractPipelineExecution) has been implemented, which tracks the execution status per Asset per Entity and Client Application pipeline. This Sync behavior checks those Assets that have not been added yet for a specific pipeline, but which are in the Assets table. These Assets will therefore be considered for the next Client Application pipeline load. If the Asset status for these Assets are in state 3 (ImportedFromDataStorageUnit), then this status will be ignored, because now the table will refer to the ExtractPipelineExecution status.

This table is being filled by the corresponding status (extracting, staging, finished, error) in by an activity in the Client Application pipeline. These Client Application pipeline templates use this PipelineSyncBehavior.

This SynBehavior LoadUpToLastValidDate works similarly to type LoadPendingUpToValidDate, but checking the ExtractPipelineExecution to work with multiple pipelines. LoadUpToLastValidDate is the mandatory PipelineSyncBehavior to configure if it is required to execute multiple Client App pipelines. In this behavior, Sync retries not finished Assets by pipeline (default limit attempts= 3). Thist can be changed through App Service configuration setting up the Sync.AssetRetries key.

When configuring the Sync behavior for executing different Client Application pipelines per Entity, we also need to consider the following:

  • The Client Application templates are automatically filling in the value in the table ExtractPipelineExecution. This is done by an internal stored procedure called [Sidra].[InsertExtractPipelineExecution]. This step is automatically performed, without any need to add any configuration parameter to the Client Application pipeline.
  • This insertion to the ExtractPipelineExecution table is performed at the beginning of the corresponding Client Application template.

Sync pipeline requirements

All the pipelines called by the Sync webjob (all except with behaviour Ignored) require using a PipelineTemplate with some required parameters, apart from the ExecutionParameters.

The values of the pipeline parameters will be populated automatically by the Sync webjob, and you can see them in the Template field from the PipelineTemplate table:

  • storageName: The storage name used to drop the content. This storage account is created by the deployment of the Client Application.
  • storageToken: The SAS container token of the storage used.
  • containerName: The storage container name where the content will be dropped.
  • queryParameters: Some parameters are populated here by the Sync webjob, which include the Assets that are going to be extracted per Entity from the Data Storage Unit.
  • overwrite: By derault this value is True. This parameter can be overriden in the ExecutionParameters field for the pipeline to indicate whether the content will be either appended, or cleaned and dropped.

From the point of view of the PipelineTemplate, an example of the section JSON (Template field) with the parameters will be the following (minimun required):

 "parameters": {
    "storageName": {
        "type": "string"
    },
    "storageToken": {
        "type": "string"
    },
    "containerName": {
        "type": "string"
    },
    "queryParameters": {
        "type": "array"
    },
    "overwrite": {
        "type": "string",
        "defaultValue": "true"
    }
}

There may be more parameters, depending on the PipelineTemplate.

Extraction pipelines configuration

The extraction pipeline execution is launched by the Sync webjob based on the information in the Client Application metadata database. Such information includes metadata synchronized from Core, but also the configuration of the data extraction pipelines.

The configuration of the data extraction pipelines from the DSU is done in the metadata database tables, in a very similar way to how the ingestion and extraction pipelines are configured in Sidra Core.

Once the specific extraction pipeline is configured, it can be created by setting up the appropriate information in the metadata database and allowing the DataFactoryManager job to create this pipeline in ADF.

The basic configuration of the pipeline follows the same steps that the data intake pipelines configuration for Core. However, in Client Applications, some additional configuration is required.

Step-by-step

Create a new pipeline for a Client Application

For more information, check the specific tutorial for creating new pipeline for a Client App .

Configure a new pipeline

For more information, check the specific tutorial for configuring data intake pipelines .

Associate Entity-Pipeline

For more information, check the specific tutorial for associating Entity and pipelines .

Pipeline type

The Pipeline table in Client Applications has some additional fields compared to the fields that the Pipeline table has in Sidra Core:

  • ExecutionParameters: This is an additional field used for including values for the parameters of the Client Application pipeline when the pipeline is executed by the Sync webjob.

EntityPipeline information

The same extraction pipeline can be used to extract several Assets from different Entities. That can be configured by associating the same pipeline to several Entities using the EntityPipeline table.

Column Description
IdPipeline [Required] Identifier of the pipeline
IdEntity [Required] Identifier of the related Entity
IsMandatory [Required] If true, Assets associated with the IdEntity need to be as a complete set to run the pipeline. If false, complete data calculation will not take into account this IdEntity. By default, TRUE.
PipelineExecutionProperties [Optional] This a place to put a JSON that could be used in the pipeline for different purposes. By default, NULL, empty or "{}".

PipelineExecutionProperties column

For version 1.12 (2022.R2) onwards, the pipelineExecutionProperties column in the EntityPipeline table of Client Applications, has been used to add a ConsolidationMode as an Overwrite parameter which will take action before any data load. A Databricks notebook will be in charge of doing the following functions while checking the parameter status:

  • When ConsolidationMode is set up as Overwrite, the entire table will be overwritten.
  • When ConsolidationMode is set up as Merge (default), the data will be merged if there is a Primary Key, otherwise the data will be appended.
  • When ConsolidationMode is set up as Append, the data is appended.
  • When other value is set up in this parameter, it will throw an error stating that the ConsolidationMode XXX is not valid.
  • When the parameter does not exist or it is empty, it will take the default value of Merge.

Summary

  • An additional field IsMandatory is included in the association between Pipeline and Entity and must be taken into account when setting up this relation using the tutorial How to associate an Entity with a pipeline.
  • Pipelines in Client Applications has an additional field -ExecutionParameters-, which must be configured.

ExtractPipelineExecution table

The ExtractPipelineExecution table is an execution tracking table that is only present in a Client Application.

Column Description
Id Id or the record
IdPipeline Id of the Client Application pipeline
PipelineRunId GUID with the RunId of the pipeline execution in ADF
PipelineExecutionDate Timestamp of execution of the pipeline
IdEntity Id of the Entity
IdAsset Id of the Asset
AssetDate Business date of the Asset being ingested
AssetOrder An order number internally used in the stored procedures for loading the data
IdExtractPipelineExecutionStatus Id of the ExtractPipelineExecutionStatus for the status of the Asset during the pipeline load

If the Sync Behavior (see the Sync Behavior documentation above) is LoadUpToLastValidDate, the Asset load status per pipeline will be checked from this table.

ExtractPipelineExecutionStatus

The ExtractPipelineExecutionStatus is a reference table to enumerate the different statuses involved along a Client Application pipeline execution.

Column Description
Id Id or the record
Name Name of the execution status
Description Description of the execution status

And these are the set of status:

Status id Status Name Status Description
0 Extracting Asset is being copied from the DSU into the Client Staging tables
1 Staging Asset already copied into the Client Staging tables
2 Finished Asset already copined into the final client tables
3 Error Issue when copying the Asset
4 Removed Asset has been deleted

The status for this table are updated by some Client Application pipeline activities.

Mandatory Assets

The same data extraction pipeline can be used to extract several Assets from different Entities. As in Sidra Core, EntityPipeline table is used to associate the same Client Application pipeline to several Entities.

For the Sync process to copy the data from the DSU it will be necessary to assign all Entities to their corresponding pipelines. For this, it will be required to insert in the EntityPipeline table in the Client App Sidra database, all the relationships between the Entities and the Client Applicatin data extraction pipeline to use.

Sidra also provides support for defining mandatory Assets, which means that the extraction pipeline will be executed only if all the Assets marked as Mandatory are present in Sidra Core, so there is an Asset at least for each of the mendatory Entities. We can mark certain Entities as optional (IsMandatory=0), for those Entities for which it is not needed that Assets are loaded in Sidra Core.

The mandatory Assets can be configured using the field IsMandatory, which is included in the EntityPipeline table and must be taken into account when setting up this relation.

Step-by-step

Associate Entity-Pipeline

Please, refer to the specific tutorial for associating Entity and pipelines .

Association with Providers

An association between the Pipeline and Provider can be added using the PipelineProvider table. This association is used by the Sync webjob to execute only those pipelines that are associated to enabled Providers, i.e. those Providers with false in the IsDisabled field.

Summary

Summarizing all the concepts above, the extraction pipelines will be launched by the Sync webjob once it has checked the following conditions:

  1. The PipelineSyncBehaviour is not Ignored.
  2. The pipeline is not associated to a Provider through an Entity that is marked as disabled.
  3. The pipeline is not being executed.
  4. There are Assets ready to be exported i.e., the AssetStatus is MovedToDataLake or ImportingFromDataLake.
  5. If the relationhip between the pipeline and the Entity contais the flag IsMandatory= true, and the date of the Asset to be exported are between the StartValidDate and EndValidDate of the Entity.
  6. There is at least as many Assets ready to be exported as the number of Assets stored in the Entity field FilesPerDrop.

Complete set of data

When an Entity is defined, there is a field named FilesPerDrop (further info in metadata description, which refers to how many files are going to be loaded for the same date for that given Entity.

For example, let's suppose that for a given Entity, every drop of data in Sidra Core requires at least three different files to be dropped to the landing zone and ingested into Sidra.

This means that, for considering the data to be complete there should be at least the minimum number of Assets available for the selected Entities, three Assets following the example.

In summary, the conditions to fulfill a complete set of data for a pipeline to run are the following:

  • The Entities are associated with the pipeline (through the Entity-Pipeline association, see below on EntityPipeline information).
  • The EntityPipeline relationship has been tagged as Mandatory. See the mandatory Assets section for detailed description of the Mandatory Assets feature.
  • The AssetDate fields for each Asset for all the Entities associated with the pipeline meet the condition to be between the StartValidDate and EndValidDate for the Entity.

The granurality of the complete set of data is based on the field AssetDate and depends on the Sync webjob behavior (see above in this page).

Available pipeline templates

There are several PipelineTemplates provided by Sidra which can be used to accelerate the data movement between the DSU and the Client Application. The pipelines are provided as-is and with the only purpose that are defined for. The platform is open to create your own pipelines with your own activites, as the same that happens in Core but here your pipeline should have the defined parameters explained in previous points to allow that can be raised by Sync webjob.

Current pipeline templates available are:

ItemId Purpose
BF367329-ABE2-42CD-BF4C-8461596B961C Execute an extraction from DSU to the client storage and staging tables, scaling up/down the selected database during the process.
19C95A0E-3909-4299-AEE1-15604819E2B0 Execute an extraction from DSU to the Client Appplication storage and staging tables.
F5170307-D08D-4F92-A9C9-92B30B9B3FF1 Extract Assets to the storage account and execute a notebook.
202BDE4E-1465-4461-9A86-492DBFFF9312 Execute an extraction from DSU to the Client Application storage, Databricks and staging tables.

SQL Staging Table generation

By default, all the pipeline templates that are dropping the content to an SQL database, the destination staging table is auto-created by the data extraction pipeline, via the execution of a stored procedure provided by Sidra.

Sidra will create the table with the same Attributes defined in the Entity, and with the same order and types except for SQL_VARIANT, VARBINARY, GEOGRAPHY and UNIQUEIDENFITIER, which will be converted as NVARCHAR(MAX) for the first of these three types, and VARCHAR(16) for UNIQUEIDENTIFIER. This is into guarantee the compatibily between the parquet files and the Azure Data Factory Copy activity.

As the tables are recreated in every load, they don't include keys or indexes. These changes need be explicitly included inside the orchestrator stored procedure called by the pipeline. This stored procedure also needs to ensure that the tables are optimimized for the queries that will be executed.

We can distinguish between these two pipelines: the default pipeline template for extraction with database scale and the default pipeline template for extraction.

There is a default pipeline template with the ItemId 19C95A0E-3909-4299-AEE1-15604819E2B0 that can be used to extract content.

This pipeline will drop the content to the Client Application storage account and to staging tables in the SQL database created in the Client Application as well. The pipeline also incorporates a further step to execute a stored procedure on the data in the staging tables, so that it can be incorporated into the productiont tables.

The pipeline invokes Sidra API to perform the extraction of the content to the Client Application storage. The content will be dropped in the storage account with .parquet format. The content is also copied to the staging tables via an Azure Data Factory copy activity.

This pipeline template performs the following steps:

  • Get the list of Entities to export from the Data Lake.
  • For those Entities to copy to staging tables, an API call to /query Sidra Core API endpoint will copy the data from the DSU to the client storage (raw).
  • A copy data activity is called to copy that data from the raw storage to the client DB staging tables. The staging tables will have been previously automatically created
  • The orchestrator SQL stored procedure for the custom business logic to create the elaborated data models is called.
  • Finally, the Asset status is changed to ImportedFromDataStorageUnit, or status 3, meaning that the Asset has been imported from the Data Storage Unit into the client database. Otherwise, status will be 0 (error).

The template includes the parameters described before to work with the Sync webjob, and it provides some mandatory parameters that are required to populate in the ExecutionParameters:

  • storedProcedureName: The SQL stored procedure that is going to be invoked after all the Entities have dropped the content. This stored procedure will need to be created before deploying a pipeline from this template.

For example following ExecutionParameters section:

{
    "storedProcedureName": "[schema].[StoredProcedure]"
}

There is a default pipeline template with the ItemId 19C95A0E-3909-4299-AEE1-15604819E2B0 that can be used to extract content.

This pipeline will drop the content to the Client Application storage account and to staging tables in the SQL database created in the Client Application as well. The pipeline also incorporates a further step to execute a stored procedure on the data in the staging tables, so that it can be incorporated into the productiont tables.

The pipeline invokes Sidra API to perform the extraction of the content to the Client Application storage. The content will be dropped in the storage account with .parquet format. The content is also copied to the staging tables via an Azure Data Factory copy activity.

The template includes the parameters described before to work with the Sync webjob, and it provides some mandatory parameters that are required to populate in the ExecutionParameters:

  • storedProcedureName: The stored procedure that is going to be invoked after all the Entities have dropped the content. This stored procedure will need to be created before deploying a pipeline from this template.

For example following ExecutionParameters section:

{
    "storedProcedureName": "[schema].[StoredProcedure]"
}

In addition, another stored procedure is called within both pipelines that inserts the tracked execution values in the ExtractPipelineExecution table.

The new pipeline will need to be created through Sidra API from these templates. Once these pipeline instances are created in the Sidra Client Application database, under Sidra.Pipeline table, there is a webjob that is responsible for creating the underlying infrastructure in Azure Data Factory.

Pipeline template for a Notebook execution (Datalabs)

This is the pipeline template with ItemId F5170307-D08D-4F92-A9C9-92B30B9B3FF1 that can be used to extract content and execute a Databricks Notebook once the raw content is stored in the storage account. This can be used with DataLab Client Application template.

This pipeline template performs the following steps:

If the notebook execution is succedded, the assets involved will be update the status to 2; otherwise to 0.

This pipeline template performs the following steps:

  • Get the list of Entities to export from the Data Lake.
  • For those Entities to copy to storage, an API call to /query Sidra Core API endpoint will copy the data from the DSU to the client storage (raw).
  • An entry point notebook, or orchestration notebook, whose name is passed as a parameter to the pipeline, is executed. Here the user will be able to add any custom exploratory data analysis on the data that is in the client raw storage.
  • Finally, the Asset status is changed to ImportedFromDataStorageUnit, or status 3, meaning that the Asset has been imported from the Data Storage Unit into the client database. Otherwise, status will be 0 (error).

The parameters required for using this pipeline are:

  • orchestratorNotebookPath: The path of the Notebook to execute. The Notebook should be previously created and uploaded into the Databricks instance.
  • entityName: A parameter to provide the name of the Entity (or empty).
  • pipelineName: A parameter to provide the name of the pipeline which is being executed (or empty).

Both entityName and pipelineName are parameters that are going to provided to the Notebook, but depending on the Notebook they can be useful or not.

For example the ExecutionParameters section will be:

{
    "orchestratorNotebookPath": "/Shared/MyNotebook",
    "entityName": "myentity",
    "pipelineName": "mypipeline"
}

Pipeline template for extraction to storage and execute Databricks notebook

This is the pipeline template with ItemId 202BDE4E-1465-4461-9A86-492DBFFF9312, that can be used by the Basic SQL and Databricks Client Application template.

This pipeline template performs the following steps:

  • Step 1: The list of Entities to query and export from the Data Lake (DSU) is retrieved. This list will also be used for creating the tables in the Client Application Databricks.
  • Step 2: If there are any Entities requiring to create tables in the Client Application Databricks, a notebook called Create Tables in the Client Application Databricks is executed.
  • Step 3: For each Entity to export, the data is first is copied from the DSU into the Client Application storage (in raw format), through a call to /query Sidra Core API endpoint.
  • Step 4: For each Entity to export, the data is copied to the Client Application Databricks as delta tables (a notebook called LoadDataInDatabricks is executed for this data operation).
  • Step 5: An entry point notebook, or orchestration notebook, whose name is passed as a parameter to the pipeline, is executed. Here the user is able to add any custom querying logic for aggregating and correlating any data that has been made available as Databricks delta tables. The queries to execute in this notebook will be read from the configured data on the StagingConfiguration Client Application table.
  • Step 6: Another Databricks notebook, called ExtractToStagingTables, is executed to load the data into the SQL staging tables, therefore making it available for further processing.
  • Step 7: The orchestrator SQL stored procedure is called. This stored procedure executes business logic to read from the staging tables and create the final production data models.
  • Step 8: Finally, the Asset status is changed to ImportedFromDataStorageUnit, or status 3, meaning that the Asset has been imported from the Data Storage Unit into the Client Application database.

The parameters for using this pipeline are:

  • storedProcedureName: The SQL stored procedure that is going to be invoked after all the Entities have dropped the content into the staging tables. This stored procedure will need to be created before deploying a pipeline from this pipeline template. The default value is [Staging].[Orchestrator].
  • notebookOrchestrator: The path of the Notebook to execute the configured queries in the StagingConfiguration table . The Notebook should be previously created and uploaded into the Databricks instance.

For example, the ExecutionParameters section will be:

{
    "notebookOrchestrator": "/Shared/MyNotebook",
    "storedProcedureName": "[staging].[Orchestrator]" 
}

Sidra Ideas Portal


Last update: 2022-07-14
Back to top