Skip to content

Sidra Client pipelines

The goal of this document is to explain in more detail concepts of Data Product pipelines.

PipelineSyncBehaviour table and Pipeline table

PipelineSyncBehaviour is a tablethat defines the different type of behaviour that is going to be executed from the Sync webjob perspective. Sync webjob is the responsible to synchronize metadata and execute the pipelines associated with the Entities (see Role of Sync and DatabaseBuilder webjobs in Data Products).

Table Pipeline contains a field pointing to the Sync behaviour. The field ExecutionParameters is used for including values in the parameters of the pipeline when this pipeline is executed by the Sync webjob.

When a pipeline is being executed, the Sync webjob will provide several parameters to the pipeline. Depending on the sync behaviour, the pipelines will be:

  • Ignored: Pipeline will be ignored by the Sync webjob, so probably that pipeline should be executed explicitly using a Trigger.
  • LoadAllNewDates: All completed date Assets (see below on complete set of data) will be included to be extracted by Sync webjob execution of the pipeline.
  • LoadFirstNewDate: Only the oldest set of Asset data completed will be included to be extracted by Sync webjob execution of the pipeline.
  • LoadLastNewDate: Only the newer set of Asset data completed will be included to be extracted by Sync webjob execution of the pipeline.
  • LoadPendingUpToValidDate: Sync webjob will instruct the pipeline to load all pending Assets of any date up to the las completed date.
  • LoadUpToLastValidDate: For version 1.12 (2022.R2) onwards, this Sync behavior checks those Assets that have not been added yet for a specific pipeline, but which are in the Assets table. These Assets will therefore be considered for the next Data Product pipeline load. This Sync behavior works similarly to type LoadPendingUpToValidDate, but checking the ExtractPipelineExecution to work with multiple pipelines. This is the mandatory Sync behavior for loading an Entity with several pipelines. This is also the recommended Sync behavior for all new load pipelines. This Sync behaviour will retry not finished Assets by pipeline for a number of times by default (3), this can be changed through AppService configuration setting up the Sync.AssetRetries key.

Considerations for PipelineSyncBehaviour table

The behaviors LoadPendingUpToValidDate or LoadUpToLastValidDate (this last one only since version 2022.R2) are required in order to avoid situations like the following:

  • If there is an error in the Data Intake Process (Sidra Service ingestion, incremental loads) when some of the Entities are loaded but the failed Entities are not loaded. The overall status of the job is Succeeded, because this way we do not block the iterative batch process in the pipeline to load all Entities. When this issue happens, e.g. if there are a set of three Entities that are marked as Mandatory on the Data Product side (table EntityPipeline), the Sync job will not trigger the pipeline until all three Assets have been generated, which could take several days (e.g. three days, assuming that on day 3 we finally have the three mandatory Assets).

Sync behavior LoadAllNewDates is only capable of loading all completed date Assets on the date of execution of the Sync webjob, leaving the already loaded increments in Sidra Service without loading on the Data Product staging tables. For some business logic of Data Products, we may need to know exactly which days there were Assets that were not processed, in order to retrieve this missing data by the Data Product.

Sync behaviors LoadPendingUpToValidDate or LoadUpToLastValidDate loads any Asset BEFORE UP TO the most recent day in which there are available Assets for every mandatory Entity.

In the example of the three Assets above, this means: any Assets (deltas) available on day 1 and any Assets (deltas) available on day 2.

Considerations for multiple pipelines per Entity

A new pipeline execution status (Sidra.ExtractPipelineExecution) has been implemented, which tracks the execution status per Asset per Entity and Data Product pipeline. This Sync behavior checks those Assets that have not been added yet for a specific pipeline, but which are in the Assets table. These Assets will therefore be considered for the next Data Product pipeline load. If the Asset status for these Assets are in state 3 (ImportedFromDataStorageUnit), then this status will be ignored, because now the table will refer to the ExtractPipelineExecution status.

This table is being filled by the corresponding status (extracting, staging, finished, error) in by an activity in the Data Product pipeline. These Data Product pipeline templates use this PipelineSyncBehavior.

This SynBehavior LoadUpToLastValidDate works similarly to type LoadPendingUpToValidDate, but checking the ExtractPipelineExecution to work with multiple pipelines. LoadUpToLastValidDate is the mandatory PipelineSyncBehavior to configure if it is required to execute multiple Data Product pipelines. In this behavior, Sync retries not finished Assets by pipeline (default limit attempts= 3). This can be changed through App Service configuration setting up the Sync.AssetRetries key.

When configuring the Sync behavior for executing different Data Product pipelines per Entity, we also need to consider the following:

  • The Data Product templates are automatically filling in the value in the table ExtractPipelineExecution. This is done by an internal stored procedure called [Sidra].[UpsertExtractPipelineExecution]. This step is automatically performed, without any need to add any configuration parameter to the Data Product pipeline.
  • This insertion to the ExtractPipelineExecution table is performed at the beginning of the corresponding Data Product template.

Sync pipeline requirements

All the pipelines called by the Sync webjob (all except with behavior Ignored) require using a PipelineTemplate (table) with some required parameters, apart from the ExecutionParameters.

The values of the pipeline parameters will be populated automatically by the Sync webjob, and you can see them in the Template field from the PipelineTemplate table:

  • storageName: The storage name used to drop the content. This storage account is created by the deployment of the Data Product.
  • storageToken: The SAS container token of the storage used.
  • containerName: The storage container name where the content will be dropped.
  • queryParameters: Some parameters are populated here by the Sync webjob, which include the Assets that are going to be extracted per Entity from the Data Storage Unit.
  • overwrite: By default this value is True. This parameter can be overridden in the ExecutionParameters field for the pipeline to indicate whether the content will be either appended, or cleaned and dropped.

From the point of view of the PipelineTemplate, an example of the section JSON (Template field) with the parameters will be the following (minimum required):

 "parameters": {
    "storageName": {
        "type": "string"
    },
    "storageToken": {
        "type": "string"
    },
    "containerName": {
        "type": "string"
    },
    "queryParameters": {
        "type": "array"
    },
    "overwrite": {
        "type": "string",
        "defaultValue": "true"
    }
}

There may be more parameters, depending on the PipelineTemplate.

Extraction pipelines configuration

The extraction pipeline execution is launched by the Sync webjob based on the information in the Data Product metadata database. Such information includes metadata synchronized from Sidra Service, but also the configuration of the data extraction pipelines.

The configuration of the data extraction pipelines from the DSU is done in the metadata database tables, in a very similar way to how the ingestion and extraction pipelines are configured in Sidra Service.

Once the specific extraction pipeline is configured, it can be created by setting up the appropriate information in the metadata database and allowing the DataFactoryManager job to create this pipeline in ADF.

The basic configuration of the pipeline follows the same steps that the data intake pipelines configuration for Sidra Service. However, in Data Products, some additional configuration is required.

Step-by-step

Create a new pipeline for a Data Product

For more information, check the specific tutorial for creating new pipeline for a Data Product .

Configure a new pipeline

For more information, check the specific tutorial for configuring data intake pipelines .

Associate Entity-Pipeline

For more information, check the specific tutorial for associating Entity and pipelines .

Mandatory Assets

The same data extraction pipeline can be used to extract several Assets from different Entities. As in Sidra Service, EntityPipeline table is used to associate the same Data Product pipeline to several Entities.

For the Sync process to copy the data from the DSU it will be necessary to assign all Entities to their corresponding pipelines. For this, it will be required to insert in the EntityPipeline table in the Data Product Sidra database, all the relationships between the Entities and the Data Product data extraction pipeline to use.

Sidra also provides support for defining mandatory Assets, which means that the extraction pipeline will be executed only if all the Assets marked as Mandatory are present in Sidra Service, so there is an Asset at least for each of the mandatory Entities. We can mark certain Entities as optional (IsMandatory=0), for those Entities for which it is not needed that Assets are loaded in Sidra Service.

The mandatory Assets can be configured using the field IsMandatory, which is included in the EntityPipeline table and must be taken into account when setting up this relation.

Step-by-step

Associate Entity-Pipeline

Please, refer to the specific tutorial for associating Entity and pipelines .

Summary

Summarizing all the concepts above, the extraction pipelines will be launched by the Sync webjob once it has checked the following conditions:

  1. The PipelineSyncBehaviour is not Ignored.
  2. The pipeline is not associated to a Provider through an Entity that is marked as disabled.
  3. The pipeline is not being executed.
  4. There are Assets ready to be exported i.e., the AssetStatus is MovedToDataLake or ImportingFromDataLake.
  5. If the relationhip between the pipeline and the Entity contais the flag IsMandatory= true, and the date of the Asset to be exported are between the StartValidDate and EndValidDate of the Entity.
  6. There is at least as many Assets ready to be exported as the number of Assets stored in the Entity field FilesPerDrop.

Some considerations more:

  • An additional field IsMandatory is included in the association between Pipeline and Entity and must be taken into account when setting up this relation using the tutorial How to associate an Entity with a pipeline.
  • Pipelines in Data Products has an additional field -ExecutionParameters-, which must be configured.

Some considerations more:

  • An additional field IsMandatory is included in the association between Pipeline and Entity and must be taken into account when setting up this relation using the tutorial How to associate an Entity with a pipeline.
  • Pipelines in Data Products has an additional field -ExecutionParameters-, which must be configured.

Complete set of data

When an Entity is defined, there is a field named FilesPerDrop (further info in metadata description, which refers to how many files are going to be loaded for the same date for that given Entity).

For example, let's suppose that for a given Entity, every drop of data in Sidra Service requires at least three different files to be dropped to the landing zone and ingested into Sidra.

This means that, for considering the data to be complete there should be at least the minimum number of Assets available for the selected Entities, three Assets following the example.

In summary, the conditions to fulfill a complete set of data for a pipeline to run are the following:

  • The Entities are associated with the pipeline (through the Entity-Pipeline association, see the EntityPipeline information).
  • The EntityPipeline relationship has been tagged as Mandatory. See the mandatory Assets section for detailed description of the Mandatory Assets feature.
  • The AssetDate fields for each Asset for all the Entities associated with the pipeline meet the condition to be between the StartValidDate and EndValidDate for the Entity.

The granurality of the complete set of data is based on the field AssetDate and depends on the Sync webjob behavior.

Available pipeline templates

There are several PipelineTemplates provided by Sidra which can be used to accelerate the data movement between the DSU and the Data Product. The pipelines are provided as-is and with the only purpose that are defined for. The platform is open to create your own pipelines with your own activites, as the same that happens in Core but here your pipeline should have the defined parameters explained in previous points to allow that can be raised by Sync webjob.

Current pipeline templates available are:

ItemId Purpose
BF367329-ABE2-42CD-BF4C-8461596B961C Execute an extraction from DSU to the client storage and staging tables, scaling up/down the selected database during the process.
19C95A0E-3909-4299-AEE1-15604819E2B0 Execute an extraction from DSU to the Data Productplication storage and staging tables.
F5170307-D08D-4F92-A9C9-92B30B9B3FF1 Extract Assets to the storage account and execute a notebook.
202BDE4E-1465-4461-9A86-492DBFFF9312 Execute an extraction from DSU to the Data Product storage, Databricks and staging tables.

The different pipeline templates used for each Data Product can be checked in the Data Products description pages themselves.

SQL Staging Table generation

By default, for for all the pipeline templates that are dropping the content to an SQL database, the destination staging table is auto-created by the data extraction pipeline, via the execution of a stored procedure provided by Sidra.

Sidra will create the table with the same Attributes defined in the Entity, and with the same order and types except for SQL_VARIANT, VARBINARY, GEOGRAPHY and UNIQUEIDENFITIER, which will be converted as NVARCHAR(MAX) for the first of these three types, and VARCHAR(16) for UNIQUEIDENTIFIER. This is into guarantee the compatibilitity between the parquet files and the Azure Data Factory Copy activity.

As the tables are recreated in every load, they don't include keys or indexes. These changes need be explicitly included inside the orchestrator stored procedure called by the pipeline. This stored procedure also needs to ensure that the tables are optimimized for the queries that will be executed.

We can distinguish between these two pipelines: the default pipeline template for extraction with database scale and the default pipeline template for extraction.

There is a default pipeline template with the ItemId 19C95A0E-3909-4299-AEE1-15604819E2B0 that can be used to extract content.

This pipeline will drop the content to the Data Product storage account and to staging tables in the SQL database created in the Data Product as well. The pipeline also incorporates a further step to execute a stored procedure on the data in the staging tables, so that it can be incorporated into the production tables.

The pipeline invokes Sidra API to perform the extraction of the content to the Data Product storage. The content will be dropped in the storage account with .parquet format. The content is also copied to the staging tables via an Azure Data Factory copy activity.

This pipeline template performs the following steps:

  • Get the list of Entities to export from the Data Lake.
  • For those Entities to copy to staging tables, an API call to /query Sidra Service API endpoint will copy the data from the DSU to the client storage (raw).
  • A copy data activity is called to copy that data from the raw storage to the client DB staging tables. The staging tables will have been previously automatically created
  • The orchestrator SQL stored procedure for the custom business logic to create the elaborated data models is called.
  • Finally, the Asset status is changed to ImportedFromDataStorageUnit, or status 3, meaning that the Asset has been imported from the Data Storage Unit into the client database. Otherwise, status will be 0 (error).

The template includes the parameters described before to work with the Sync webjob, and it provides some mandatory parameters that are required to populate in the ExecutionParameters:

  • storedProcedureName: The SQL stored procedure that is going to be invoked after all the Entities have dropped the content. This stored procedure will need to be created before deploying a pipeline from this template.

For example following ExecutionParameters section:

{
    "storedProcedureName": "[schema].[StoredProcedure]"
}

There is a default pipeline template with the ItemId 19C95A0E-3909-4299-AEE1-15604819E2B0 that can be used to extract content.

This pipeline will drop the content to the Data Product storage account and to staging tables in the SQL database created in the Data Product as well. The pipeline also incorporates a further step to execute a stored procedure on the data in the staging tables, so that it can be incorporated into the productiont tables.

The pipeline invokes Sidra API to perform the extraction of the content to the Data Product storage. The content will be dropped in the storage account with .parquet format. The content is also copied to the staging tables via an Azure Data Factory copy activity.

The template includes the parameters described before to work with the Sync webjob, and it provides some mandatory parameters that are required to populate in the ExecutionParameters:

  • storedProcedureName: The stored procedure that is going to be invoked after all the Entities have dropped the content. This stored procedure will need to be created before deploying a pipeline from this template.

For example following ExecutionParameters section:

{
    "storedProcedureName": "[schema].[StoredProcedure]"
}

In addition, another stored procedure is called within both pipelines that inserts the tracked execution values in the ExtractPipelineExecution table.

The new pipeline will need to be created through Sidra API from these templates. Once these pipeline instances are created in the Sidra Data Product database, under Sidra.Pipeline table, there is a webjob that is responsible for creating the underlying infrastructure in Azure Data Factory.

Stored Procedure

For loading the data into an SQL database - which is inside of the Data Product solution -, for example, and adding the required business logic to generate the production tables, an orchestrator stored procedure will be necessary. This can be created in the staging schema with the following structure and completed depending on the Data Product logic.

CREATE PROCEDURE [staging].[orchestrator]
    @pipelineRunId NVARCHAR(100)
AS
    SELECT 1
    -- TODO: POPULATE WITH LOGIC
RETURN 0
GO

Example of Stored Procedure

The following code is just an example depicting the structure of the orchestrator stored procedure to use in the Data Product.

CREATE PROCEDURE [staging].[orchestrator] (@pipelineRunId NVARCHAR(100))
AS
BEGIN
    DECLARE @IdPipeline INT = (SELECT TOP 1 IdPipeline FROM [Sidra].[ExtractPipelineExecution] WHERE [PipelineRunId] = @pipelineRunId)
    DECLARE @IdLoad INT;
    EXECUTE @IdLoad = [Sidra].[LoadProcessLog] 1, @IdPipeline, NULL, @pipelineRunId;    
    DECLARE @AssetOrder INT = 1;
    DECLARE @MaxAssetOrder INT;
    SELECT @MaxAssetOrder = COALESCE(MAX([AssetOrder]), 0) FROM [Sidra].[ExtractPipelineExecution] WHERE [PipelineRunId] = @pipelineRunId
    BEGIN TRY
        IF @MaxAssetOrder > 0
        BEGIN
            WHILE (@AssetOrder <= @MaxAssetOrder)
            BEGIN
                    --- YOUR CODE HERE
                    --- YOUR CODE HERE
                    --- YOUR CODE HERE

                SET @AssetOrder=@AssetOrder + 1
            END
            EXECUTE [Sidra].[LoadProcessLog] 2, @IdPipeline, @IdLoad, @pipelineRunId;
            EXECUTE [Sidra].[UpdateAssetLoadStatus] @pipelineRunId, 101;
        END
        IF @MaxAssetOrder = 0
        BEGIN
            EXECUTE [Sidra].[LoadProcessLog] 3, @IdPipeline, @IdLoad, @pipelineRunId;
        END
    END TRY
    BEGIN CATCH
        DECLARE @Message nvarchar(4000) = ERROR_MESSAGE() 
        DECLARE @ErrorSeverity INT = ERROR_SEVERITY();  
        DECLARE @ErrorState INT = ERROR_STATE();
        EXECUTE [Sidra].[LoadProcessLog] 0, @IdPipeline, @IdLoad, @pipelineRunId;
        EXECUTE [Sidra].[UpdateAssetLoadStatus] @pipelineRunId, 102;
        RAISERROR (@Message, @ErrorSeverity, @ErrorState);   
    END CATCH
END

Stored Procedure Templates

There are several generic stored procedures provided by Sidra to ease the custom data extraction from the Data Product Databricks tables or raw storage -depending on the business logic-, to the Data Product database tables or production/model final tables:

  • GetCreateStagingTable. This stored procedure is responsible for selecting the Attributes data from the Entities of the Data Product Databricks or raw storage, doing some conversions depending on the data type -as sanitize certain characters- and creating the staging tables, group by Entity, in the Data Product database. Also, it gives the information to the EntityStagingMapping table, where the relationship between Entities and staging tables names is stored.

  • GetNextPendingIdAsset. This stored procedure, when executed through the pipeline template deployment, is in charge of detecting and loading the new Assets to the staging tables in the Data Product database. This stored procedure can be used in the orchestrator procedure.

  • GetNextPendingIdAsset. This stored procedure, when executed through the pipeline template deployment, is in charge of detecting and loading the new Assets to the staging tables in the Data Product database.

  • UpsertExtractPipelineExecution. In this case, this store procedure will maintain the [ExtractPipelineExecution tracking table](Client-Application-tables.md#extractpipelineexecution-table] updated via inserting new registers (Asset data) when they do not exist, or updating the current ones when changes have made. Then, the status of the Data Product pipeline execution will be updated with the values detailed in its table.


Last update: 2023-09-22