Sidra Client pipelines¶
The goal of this document is to explain in more detail concepts of Data Product pipelines.
PipelineSyncBehaviour table and Pipeline table¶
PipelineSyncBehaviour
is a table that defines the different type of behaviour that is going to be executed from the Sync
webjob perspective. Sync
webjob is the responsible to synchronize metadata and execute the pipelines associated with the Entities (see Role of Sync and DatabaseBuilder webjobs in Data Products).
Table Pipeline
contains a field pointing to the Sync behaviour. The field ExecutionParameters
is used for including values in the parameters of the pipeline when this pipeline is executed by the Sync
webjob.
When a pipeline is being executed, the Sync
webjob will provide several parameters to the pipeline. Depending on the sync behaviour, the pipelines will be:
Ignored
: Pipeline will be ignored by theSync
webjob, so probably that pipeline should be executed explicitly using aTrigger
.LoadAllNewDates
: All completed date Assets (see below on complete set of data) will be included to be extracted bySync
webjob execution of the pipeline.LoadFirstNewDate
: Only the oldest set of Asset data completed will be included to be extracted bySync
webjob execution of the pipeline.LoadLastNewDate
: Only the newer set of Asset data completed will be included to be extracted bySync
webjob execution of the pipeline.LoadPendingUpToValidDate
:Sync
webjob will instruct the pipeline to load all pending Assets of any date up to the las completed date.LoadUpToLastValidDate
: For version 1.12 (2022.R2) onwards, this Sync behavior checks those Assets that have not been added yet for a specific pipeline, but which are in the Assets table. These Assets will therefore be considered for the next Data Product pipeline load. This Sync behavior works similarly to typeLoadPendingUpToValidDate
, but checking theExtractPipelineExecution
to work with multiple pipelines. This is the mandatory Sync behavior for loading an Entity with several pipelines. This is also the recommended Sync behavior for all new load pipelines. This Sync behaviour will retry not finished Assets by pipeline for a number of times by default (3), this can be changed through AppService configuration setting up theSync.AssetRetries
key.
Considerations for PipelineSyncBehaviour table¶
The behaviors LoadPendingUpToValidDate
or LoadUpToLastValidDate
(this last one only since version 2022.R2) are required in order to avoid situations like the following:
- If there is an error in the Data Intake Process (Sidra Service ingestion, incremental loads) when some of the Entities are loaded but the failed Entities are not loaded. The overall status of the job is
Succeeded
, because this way we do not block the iterative batch process in the pipeline to load all Entities. When this issue happens, e.g. if there are a set of three Entities that are marked as Mandatory on the Data Product side (tableEntityPipeline
), the Sync job will not trigger the pipeline until all three Assets have been generated, which could take several days (e.g. three days, assuming that on day 3 we finally have the three mandatory Assets).
Sync behavior LoadAllNewDates
is only capable of loading all completed date Assets on the date of execution of the Sync webjob, leaving the already loaded increments in Sidra Service without loading on the Data Product staging tables. For some business logic of Data Products, we may need to know exactly which days there were Assets that were not processed, in order to retrieve this missing data by the Data Product.
Sync behaviors LoadPendingUpToValidDate
or LoadUpToLastValidDate
loads any Asset BEFORE UP TO the most recent day in which there are available Assets for every mandatory Entity.
In the example of the three Assets above, this means: any Assets (deltas) available on day 1 and any Assets (deltas) available on day 2.
Considerations for multiple pipelines per Entity¶
A new pipeline execution status (Sidra.ExtractPipelineExecution
) has been implemented, which tracks the execution status per Asset per Entity and Data Product pipeline. This Sync behavior checks those Assets that have not been added yet for a specific pipeline, but which are in the Assets
table. These Assets will therefore be considered for the next Data Product pipeline load. If the Asset status for these Assets are in state 3 (ImportedFromDataStorageUnit
), then this status will be ignored, because now the table will refer to the ExtractPipelineExecution
status.
This table is being filled by the corresponding status (extracting
, staging
, finished
, error
) in by an activity in the Data Product pipeline. These Data Product pipeline templates use this PipelineSyncBehavior
.
This SynBehavior LoadUpToLastValidDate
works similarly to type LoadPendingUpToValidDate
, but checking the ExtractPipelineExecution
to work with multiple pipelines. LoadUpToLastValidDate
is the mandatory PipelineSyncBehavior
to configure if it is required to execute multiple Data Product pipelines. In this behavior, Sync retries not finished Assets by pipeline (default limit attempts= 3). This can be changed through App Service configuration setting up the Sync.AssetRetries
key.
When configuring the Sync behavior for executing different Data Product pipelines per Entity, we also need to consider the following:
- The Data Product templates are automatically filling in the value in the table
ExtractPipelineExecution
. This is done by an internal stored procedure called[Sidra].[UpsertExtractPipelineExecution]
. This step is automatically performed, without any need to add any configuration parameter to the Data Product pipeline. - This insertion to the
ExtractPipelineExecution
table is performed at the beginning of the corresponding Data Product template.
Sync pipeline requirements¶
All the pipelines called by the Sync
webjob (all except with behavior Ignored
) require using a PipelineTemplate
(table) with some required parameters, apart from the ExecutionParameters
.
The values of the pipeline parameters will be populated automatically by the Sync
webjob, and you can see them in the Template
field from the PipelineTemplate
table:
storageName
: The storage name used to drop the content. This storage account is created by the deployment of the Data Product.storageToken
: The SAS container token of the storage used.containerName
: The storage container name where the content will be dropped.queryParameters
: Some parameters are populated here by theSync
webjob, which include the Assets that are going to be extracted per Entity from the Data Storage Unit.overwrite
: By default this value isTrue
. This parameter can be overridden in theExecutionParameters
field for the pipeline to indicate whether the content will be either appended, or cleaned and dropped.
From the point of view of the PipelineTemplate
, an example of the section JSON (Template
field) with the parameters will be the following (minimum required):
"parameters": {
"storageName": {
"type": "string"
},
"storageToken": {
"type": "string"
},
"containerName": {
"type": "string"
},
"queryParameters": {
"type": "array"
},
"overwrite": {
"type": "string",
"defaultValue": "true"
}
}
There may be more parameters, depending on the PipelineTemplate
.
Extraction pipelines configuration¶
The extraction pipeline execution is launched by the Sync
webjob based on the information in the Data Product metadata database. Such information includes metadata synchronized from Sidra Service, but also the configuration of the data extraction pipelines.
The configuration of the data extraction pipelines from the DSU is done in the metadata database tables, in a very similar way to how the ingestion and extraction pipelines are configured in Sidra Service.
Once the specific extraction pipeline is configured, it can be created by setting up the appropriate information in the metadata database and allowing the DataFactoryManager
job to create this pipeline in ADF.
The basic configuration of the pipeline follows the same steps that the data intake pipelines configuration for Sidra Service. However, in Data Products, some additional configuration is required.
Step-by-step
Create a new pipeline for a Data Product
For more information, check the specific tutorial for creating new pipeline for a Data Product .
Associate Entity-Pipeline
For more information, check the specific tutorial for associating Entity and pipelines .
Mandatory Assets¶
The same data extraction pipeline can be used to extract several Assets from different Entities. As in Sidra Service, EntityPipeline
table is used to associate the same Data Product pipeline to several Entities.
For the Sync process to copy the data from the DSU it will be necessary to assign all Entities to their corresponding pipelines. For this, it will be required to insert in the EntityPipeline
table in the Data Product Sidra database, all the relationships between the Entities and the Data Product data extraction pipeline to use.
Sidra also provides support for defining mandatory Assets, which means that the extraction pipeline will be executed only if all the Assets marked as Mandatory are present in Sidra Service, so there is an Asset at least for each of the mandatory Entities. We can mark certain Entities as optional (IsMandatory
=0), for those Entities for which it is not needed that Assets are loaded in Sidra Service.
The mandatory Assets can be configured using the field IsMandatory
, which is included in the EntityPipeline
table and must be taken into account when setting up this relation.
Step-by-step
Associate Entity-Pipeline
Please, refer to the specific tutorial for associating Entity and pipelines .
Summary
Summarizing all the concepts above, the extraction pipelines will be launched by the Sync webjob once it has checked the following conditions:
- The PipelineSyncBehaviour is not Ignored.
- The pipeline is not associated to a Provider through an Entity that is marked as disabled.
- The pipeline is not being executed.
- There are Assets ready to be exported i.e., the AssetStatus is MovedToDataLake or ImportingFromDataLake.
- If the relationship between the pipeline and the Entity contains the flag IsMandatory= true, and the date of the Asset to be exported are between the StartValidDate and EndValidDate of the Entity.
- There is at least as many Assets ready to be exported as the number of Assets stored in the Entity field FilesPerDrop.
Some considerations more:
- An additional field IsMandatory is included in the association between Pipeline and Entity and must be taken into account when setting up this relation using the tutorial How to associate an Entity with a pipeline.
- Pipelines in Data Products has an additional field -ExecutionParameters-, which must be configured.
Some considerations more:
- An additional field IsMandatory is included in the association between Pipeline and Entity and must be taken into account when setting up this relation using the tutorial How to associate an Entity with a pipeline.
- Pipelines in Data Products has an additional field -ExecutionParameters-, which must be configured.
Complete set of data¶
When an Entity
is defined, there is a field named FilesPerDrop
(further info in metadata description, which refers to how many files are going to be loaded for the same date for that given Entity).
For example, let's suppose that for a given Entity, every drop of data in Sidra Service requires at least three different files to be dropped to the landing zone and ingested into Sidra.
This means that, for considering the data to be complete there should be at least the minimum number of Assets
available for the selected Entities
, three Assets following the example.
In summary, the conditions to fulfill a complete set of data for a pipeline to run are the following:
- The Entities are associated with the pipeline (through the Entity-Pipeline association, see the
EntityPipeline
information). - The EntityPipeline relationship has been tagged as Mandatory. See the mandatory Assets section for detailed description of the Mandatory Assets feature.
- The
AssetDate
fields for each Asset for all the Entities associated with the pipeline meet the condition to be between theStartValidDate
andEndValidDate
for the Entity.
The granurality of the complete set of data is based on the field AssetDate
and depends on the Sync
webjob behavior.
Available pipeline templates¶
There are several PipelineTemplates
provided by Sidra which can be used to accelerate the data movement between the DSU and the Data Product. The pipelines are provided as-is and with the only purpose that are defined for. The platform is open to create your own pipelines with your own activities, as the same that happens in Core but here your pipeline should have the defined parameters explained in previous points to allow that can be raised by Sync
webjob.
Current pipeline templates available are:
ItemId | Purpose |
---|---|
BF367329-ABE2-42CD-BF4C-8461596B961C | Execute an extraction from DSU to the client storage and staging tables, scaling up/down the selected database during the process. |
19C95A0E-3909-4299-AEE1-15604819E2B0 | Execute an extraction from DSU to the Data Product storage and staging tables. |
F5170307-D08D-4F92-A9C9-92B30B9B3FF1 | Extract Assets to the storage account and execute a notebook. |
202BDE4E-1465-4461-9A86-492DBFFF9312 | Execute an extraction from DSU to the Data Product storage, Databricks and staging tables. |
The different pipeline templates used for each Data Product can be checked in the Data Products description pages themselves.
SQL Staging Table generation¶
By default, for for all the pipeline templates that are dropping the content to an SQL database, the destination staging table is auto-created by the data extraction pipeline, via the execution of a stored procedure provided by Sidra.
Sidra will create the table with the same Attributes defined in the Entity, and with the same order and types except for SQL_VARIANT
, VARBINARY
, GEOGRAPHY
and UNIQUEIDENFITIER
, which will be converted as NVARCHAR(MAX)
for the first of these three types, and VARCHAR(16)
for UNIQUEIDENTIFIER
. This is into guarantee the compatibility between the parquet files and the Azure Data Factory Copy activity.
As the tables are recreated in every load, they don't include keys or indexes. These changes need be explicitly included inside the orchestrator stored procedure called by the pipeline. This stored procedure also needs to ensure that the tables are optimized for the queries that will be executed.
We can distinguish between these two pipelines: the default pipeline template for extraction with database scale and the default pipeline template for extraction.
There is a default pipeline template with the ItemId
19C95A0E-3909-4299-AEE1-15604819E2B0 that can be used to extract content.
This pipeline will drop the content to the Data Product storage account and to staging tables in the SQL database created in the Data Product as well. The pipeline also incorporates a further step to execute a stored procedure on the data in the staging tables, so that it can be incorporated into the production tables.
The pipeline invokes Sidra API to perform the extraction of the content to the Data Product storage. The content will be dropped in the storage account with .parquet format. The content is also copied to the staging tables via an Azure Data Factory copy activity.
This pipeline template performs the following steps:
- Get the list of Entities to export from the Data Lake.
- For those Entities to copy to staging tables, an API call to /query Sidra Service API endpoint will copy the data from the DSU to the client storage (raw).
- A copy data activity is called to copy that data from the raw storage to the client DB staging tables. The staging tables will have been previously automatically created
- The orchestrator SQL stored procedure for the custom business logic to create the elaborated data models is called.
- Finally, the Asset status is changed to
ImportedFromDataStorageUnit
, or status 3, meaning that the Asset has been imported from the Data Storage Unit into the client database. Otherwise, status will be 0 (error).
The template includes the parameters described before to work with the Sync
webjob, and it provides some mandatory parameters that are required to populate in the ExecutionParameters
:
storedProcedureName
: The SQL stored procedure that is going to be invoked after all the Entities have dropped the content. This stored procedure will need to be created before deploying a pipeline from this template.
For example following ExecutionParameters
section:
There is a default pipeline template with the ItemId
19C95A0E-3909-4299-AEE1-15604819E2B0 that can be used to extract content.
This pipeline will drop the content to the Data Product storage account and to staging tables in the SQL database created in the Data Product as well. The pipeline also incorporates a further step to execute a stored procedure on the data in the staging tables, so that it can be incorporated into the production tables.
The pipeline invokes Sidra API to perform the extraction of the content to the Data Product storage. The content will be dropped in the storage account with .parquet format. The content is also copied to the staging tables via an Azure Data Factory copy activity.
The template includes the parameters described before to work with the Sync
webjob, and it provides some mandatory parameters that are required to populate in the ExecutionParameters
:
storedProcedureName
: The stored procedure that is going to be invoked after all the Entities have dropped the content. This stored procedure will need to be created before deploying a pipeline from this template.
For example following ExecutionParameters
section:
In addition, another stored procedure is called within both pipelines that inserts the tracked execution values in the ExtractPipelineExecution table.
The new pipeline will need to be created through Sidra API from these templates. Once these pipeline instances are created in the Sidra Data Product database, under Sidra.Pipeline
table, there is a webjob that is responsible for creating the underlying infrastructure in Azure Data Factory.
Stored Procedure¶
For loading the data into an SQL database - which is inside of the Data Product solution -, for example, and adding the required business logic to generate the production tables, an orchestrator stored procedure will be necessary. This can be created in the staging schema with the following structure and completed depending on the Data Product logic.
CREATE PROCEDURE [staging].[orchestrator]
@pipelineRunId NVARCHAR(100)
AS
SELECT 1
-- TODO: POPULATE WITH LOGIC
RETURN 0
GO
Example of Stored Procedure¶
The following code is just an example depicting the structure of the orchestrator stored procedure to use in the Data Product.
CREATE PROCEDURE [staging].[orchestrator] (@pipelineRunId NVARCHAR(100))
AS
BEGIN
DECLARE @IdPipeline INT = (SELECT TOP 1 IdPipeline FROM [Sidra].[ExtractPipelineExecution] WHERE [PipelineRunId] = @pipelineRunId)
DECLARE @IdLoad INT;
EXECUTE @IdLoad = [Sidra].[LoadProcessLog] 1, @IdPipeline, NULL, @pipelineRunId;
DECLARE @AssetOrder INT = 1;
DECLARE @MaxAssetOrder INT;
SELECT @MaxAssetOrder = COALESCE(MAX([AssetOrder]), 0) FROM [Sidra].[ExtractPipelineExecution] WHERE [PipelineRunId] = @pipelineRunId
BEGIN TRY
IF @MaxAssetOrder > 0
BEGIN
WHILE (@AssetOrder <= @MaxAssetOrder)
BEGIN
--- YOUR CODE HERE
--- YOUR CODE HERE
--- YOUR CODE HERE
SET @AssetOrder=@AssetOrder + 1
END
EXECUTE [Sidra].[LoadProcessLog] 2, @IdPipeline, @IdLoad, @pipelineRunId;
EXECUTE [Sidra].[UpdateAssetLoadStatus] @pipelineRunId, 101;
END
IF @MaxAssetOrder = 0
BEGIN
EXECUTE [Sidra].[LoadProcessLog] 3, @IdPipeline, @IdLoad, @pipelineRunId;
END
END TRY
BEGIN CATCH
DECLARE @Message nvarchar(4000) = ERROR_MESSAGE()
DECLARE @ErrorSeverity INT = ERROR_SEVERITY();
DECLARE @ErrorState INT = ERROR_STATE();
EXECUTE [Sidra].[LoadProcessLog] 0, @IdPipeline, @IdLoad, @pipelineRunId;
EXECUTE [Sidra].[UpdateAssetLoadStatus] @pipelineRunId, 102;
RAISERROR (@Message, @ErrorSeverity, @ErrorState);
END CATCH
END
Stored Procedure Templates¶
There are several generic stored procedures provided by Sidra to ease the custom data extraction from the Data Product Databricks tables or raw storage -depending on the business logic-, to the Data Product database tables or production/model final tables:
-
GetCreateStagingTable
. This stored procedure is responsible for selecting the Attributes data from the Entities of the Data Product Databricks or raw storage, doing some conversions depending on the data type -as sanitize certain characters- and creating the staging tables, group by Entity, in the Data Product database. Also, it gives the information to theEntityStagingMapping
table, where the relationship between Entities and staging tables names is stored. -
GetNextPendingIdAsset
. This stored procedure, when executed through the pipeline template deployment, is in charge of detecting and loading the new Assets to the staging tables in the Data Product database. This stored procedure can be used in the orchestrator procedure. -
GetNextPendingIdAsset
. This stored procedure, when executed through the pipeline template deployment, is in charge of detecting and loading the new Assets to the staging tables in the Data Product database. -
UpsertExtractPipelineExecution
. In this case, this store procedure will maintain theExtractPipelineExecution
tracking table updated via inserting new registers (Asset data) when they do not exist, or updating the current ones when changes have made. Then, the status of the Data Product pipeline execution will be updated with the values detailed in its table.