Add a new pipeline for a Data Product¶
Data Factory pipelines can be defined in a JSON format. Using that JSON, they can be created programmatically using the Data Factory API. There is a key component in Sidra, the Data Factory Manager for Client, that composes the pipelines JSON from the templates and parameters stored in the metadata database and use it to create the pipeline in Data Factory.
You can find more information about how the Data Product works and also how client pipelines work. Check how to create a new Data Product for more details.
Summary: Steps to create a Data Product pipeline
- The steps to create a Data Product pipeline are basically similar to those steps needed for Sidra Core and can be summarized in the following way:
- Choose the PipelineSyncBehaviour required for the pipeline. Please refer to Data Products pipeline sync behaviours.
- Get the Pipeline template id to use.
- Create the ItemId for the new pipeline to be created and store this as a variable to be used when creating the pipeline.
- Create an entry to Pipeline table with the selected PipelineTemplate, IdDataFactory and IdPipelineSyncBehaviour, providing information on Parameters if required.
- Associate the Entities with the pipeline by adding the Entity-Pipeline relationship in the EntityPipeline table. Populate EntityPipeline table with the IdPipeline created in the previous step and the IdEntity of each one of the Entities required for the pipeline workflow.
- Put the new created pipeline script inside the DatabaseBuilder Data Product project.
- Raise a deployment if it is not configured in the current environment so it is raised automatically. Database builder will execute the SQL script to put the content in the database to the artifacts in Data Factory.
Tutorial: Adding a basic pipeline for extracting selected Entities into staging¶
Before starting this tutorial, Data Product pipelines should be associated -as well as the Core ones- to the EntityPipeline
table to be raised from the Sync webjob with the right configuration. If the pipeline is not being raised by the Sync webjob, it should be raised with a Trigger, populating the tables Trigger
, TriggerPipeline
and TriggerTemplate
. Check this document for further information about trigger model, which works in the same way than in Core.
A full explanation of the process below can be found in the page Create a new Data Product.
1. Get the pipeline template to use¶
Choose the ItemId
of one of the templates described and prepare an SQL statement to get the actual Id
of the template. This Id will be required for the pipeline creation.
See the following example:
DECLARE @ExtractPipelineTemplateId INT =
(
SELECT [Id]
FROM [DataIngestion].[PipelineTemplate]
WHERE [ItemId] = 'BF367329-ABE2-42CD-BF4C-8461596B961C'
)
2. Create the ItemId for the new pipeline¶
Create a GUID that will be used to define the ItemId of the new pipeline. This GUID could be created using Visual Studio, PowerShell or T-SQL.
For example, for T-SQL you could use the following line of code:
3. Store the ItemId as a variable to be used¶
Supposing that the previous sentence returns 6860B34B-A53F-42EF-8FD7-D2CAF5690CAF as a value, just store this resulting value in a variable:
4. Prepare the rollback section¶
To guarantee the idempotency of the script, it is required to delete at the beginning of the script all the changes that the script is going to perform.
In this case, the script will be performing the following activities:
- The script will insert a new pipeline based on the templates provided by Sidra.
- The script will associate the new pipeline with several Entities through the table
EntityPipeline
.
The script should then ensure the following:
- The pipeline that is going to be inserted does not exist.
- The relationship between the Entities does not exist.
To provide these checks, it is required to add the following statements:
DELETE FROM [Sidra].[EntityPipeline] WHERE [IdPipeline] IN
(SELECT [Id] FROM [Sidra].[Pipeline] WHERE [ItemId] = @ExtractPipelineItemId)
DELETE FROM [Sidra].[Pipeline] WHERE [ItemId] = @ExtractPipelineItemId
In case more items are added, they should be removed in the same way as well.
5. Add the new pipeline¶
For adding the new pipeline, it is required to use following Ids:
ItemId
as a unique id for the pipeline in the system. This is the value calculated in step 3 as@ExtractPipelineItemId
.IdTemplate
is the id of the template used, calculated in the step 1 as@ExtractPipelineTemplateId
.IdDataFactory
is the Data Factory used to deploy this pipeline. This can be found in[DataIngestion].[DataFactory]
table and normally in the common Data Product there is only one; so the value1
will be used assuming that Id exists in the related table.IdPipelineSyncBehaviour
: This was described in this section and by default, a value of1
will be assumed as the standard behaviour.ExecutionParameters
: The pipeline template used provides parameters to scale the database and to execute a stored procedure.
INSERT INTO [DataIngestion].[Pipeline]
([ItemId],[Name],[ValidFrom],[ValidUntil],[IdTemplate],[LastUpdated],[LastDeployed],
[IdDataFactory],[IsRemoved],[IdPipelineSyncBehaviour],[Parameters],[ExecutionParameters])
VALUES
(@ExtractPipelineItemId, N'ExtractScalingDB', GETUTCDATE(), NULL, @ExtractPipelineTemplateId,
GETUTCDATE(), NULL, 1, 0, 1, N'{}', N'{
"storedProcedureName": "[staging].[orchestrator]",
"scaleUpCapacity":"<CAPACITY E.G. 50>",
"scaleUpTierName":"<TIER NAME E.G. S2>"}'
)
6. Add Entity-Pipeline relationship using a SQL script¶
In order to extract the data from the desired Entities in the DSU, it is required to associate the pipeline with them. In the following example let's assume for this pipeline that the Entities with Id 1, 3 and 5 are the Entities required to extract data from.
First thing is to get the Id of the pipeline created:
DECLARE @ExtractPipelineId INT =
(
SELECT [Id]
FROM [DataIngestion].[Pipeline]
WHERE [ItemId] = @ExtractPipelineItemId
)
Second, insert the relationship between the Entity
and the Pipeline
:
INSERT INTO [DataIngestion].[EntityPipeline]
([IdEntity],[IdPipeline],[IsMandatory],[PipelineExecutionProperties])
VALUES
(1, @ExtractPipelineId, 1, NULL),
(3, @ExtractPipelineId, 1, NULL),
(5, @ExtractPipelineId, 1, NULL)
7. Complete script¶
DECLARE @ExtractPipelineTemplateId INT =
(
SELECT [Id]
FROM [DataIngestion].[PipelineTemplate]
WHERE [ItemId] = 'BF367329-ABE2-42CD-BF4C-8461596B961C'
)
DECLARE @ExtractPipelineItemId UNIQUEIDENTIFIER = '6860B34B-A53F-42EF-8FD7-D2CAF5690CAF'
-- Cleanup
DELETE FROM [Sidra].[EntityPipeline] WHERE [IdPipeline] IN
(SELECT [Id] FROM [Sidra].[Pipeline] WHERE [ItemId] = @ExtractPipelineItemId)
DELETE FROM [Sidra].[Pipeline] WHERE [ItemId] = @ExtractPipelineItemId
-- Script content
INSERT INTO [DataIngestion].[Pipeline]
([ItemId],[Name],[ValidFrom],[ValidUntil],[IdTemplate],[LastUpdated],[LastDeployed],
[IdDataFactory],[IsRemoved],[IdPipelineSyncBehaviour],[Parameters],[ExecutionParameters])
VALUES
(@ExtractPipelineItemId, N'ExtractScalingDB', GETUTCDATE(), NULL, @ExtractPipelineTemplateId,
GETUTCDATE(), NULL, 1, 0, 1, N'{}', N'{
"storedProcedureName": "[staging].[orchestrator]",
"scaleUpCapacity":"<CAPACITY E.G. 50>",
"scaleUpTierName":"<TIER NAME E.G. S2>"}'
)
DECLARE @ExtractPipelineId INT =
(
SELECT [Id]
FROM [DataIngestion].[Pipeline]
WHERE [ItemId] = @ExtractPipelineItemId
)
INSERT INTO [DataIngestion].[EntityPipeline]
([IdEntity],[IdPipeline],[IsMandatory],[PipelineExecutionProperties])
VALUES
(1, @ExtractPipelineId, 1, NULL),
(3, @ExtractPipelineId, 1, NULL),
(5, @ExtractPipelineId, 1, NULL)
8. Put your pipeline inside DatabaseBuilder¶
Add an SQL script following the naming conventions to the Scripts\ClientContext
folder or to the place configured in the DatabaseBuilder from which to retrieve the scripts. Ensure that the script is working without errors and marked with "CopyAlways" to ensure that will be part of the result of the code compilation. You can find more information about this step here.
9. Push the changes into the branch¶
Raise a deployment if it is not configured in the current environment to be raised automatically and wait until the result. DatabaseBuilder
job will execute the SQL script to put the content in the Data Product database and then DataFactoryManager for client
will update the content in the Data Products database to the artifacts in the client Data Factory.