Add a new pipeline for a Client Application

Data Factory pipelines can be defined in a JSON format. Using that JSON, they can be created programmatically using the Data Factory API. There is a key component in Sidra, the Data Factory Manager for Client, that composes the pipelines JSON from the templates and parameters stored in the metadata database and use it to create the pipeline in Data Factory.

You can find more information about how to populate pipelines in Core and how the client application works and also how client pipelines work.

Steps to create a Client Application pipeline

The steps to create a Client Application pipeline are basically similar to those steps needed for Sidra Core and can be summarized in the following way:

  1. Choose the PipelineSyncBehaviour required for the pipeline. Please refer to Client Applications pipeline sync behaviours for details on sync behaviour types.
  2. Get the Pipeline template id to use.
  3. Create the ItemId for the new pipeline to be created and store this as a variable to be used when creating the pipeline.
  4. Create an entry to Pipeline table with the selected PipelineTemplate, IdDataFactory and IdPipelineSyncBehaviour, providing information on ExecutionParameters if required.
  5. Associate the Entities with the pipeline by adding the entity-pipeline relationship in the EntityPipeline table. Populate EntityPipeline table with the IdPipeline created in the previous step and the IdEntity of each one of the entities required for the pipeline workflow.
  6. Put the new created pipeline script inside the DatabaseBuilder Client Application project.
  7. Raise a deployment if it is not configured in the current environment so it is raised automatically. Database builder will execute the SQL script to put the content in the database to the artifacts in Data Factory.

Tutorial: adding a basic pipeline for extracting selected entities into staging.

Before starting this tutorial, Client Application pipelines should be associated -as well as the Core ones- to the EntityPipeline table to be raised from the Sync webjob with the right configuration. If the pipeline is not being raised by the Sync webjob, it should be raised with a Trigger, populating the tables Trigger, TriggerPipeline and TriggerTemplate. Check this document for further information about trigger model, which works in the same way than in Core.

1. Get the pipeline template to use

Choose the ItemId of one of the templates described and prepare an SQL statement to get the actual Id of the template. This Id will be required for the pipeline creation.

See the following example:

1
2
3
4
5
6
DECLARE @extractPipelineTemplateId INT =
(
    SELECT [Id]
    FROM [DataIngestion].[PipelineTemplate]
    WHERE [ItemId] = 'BF367329-ABE2-42CD-BF4C-8461596B961C'
)

2. Create the ItemId for the new pipeline

Create a GUID that will be used to define the ItemId of the new pipeline. This GUID could be created using Visual Studio, PowerShell or T-SQL.

For example, for T-SQL you could use the following line of code:

1
SELECT NEWID()

3. Store the ItemId as a variable to be used

Supposing that the previous sentence returns 6860B34B-A53F-42EF-8FD7-D2CAF5690CAF as a value, just store this resulting value in a variable:

1
DECLARE @extractPipelineItemId UNIQUEIDENTIFIER = '6860B34B-A53F-42EF-8FD7-D2CAF5690CAF'

4. Prepare the rollback section

To guarantee the idempotency of the script, it is required to delete at the beginning of the script all the changes that the script is going to perform.

In this case, the script will be performing the following activities:

  • The script will insert a new pipeline based on the templates provided by Sidra.
  • The script will associate the new pipeline with several Entities through the table EntityPipeline.

The script should then ensure the following:

  1. The pipeline that is going to be inserted does not exist.
  2. The relationship between the Entities does not exist.

To provide these checks, it is required to add the following statements:

1
2
DELETE FROM [DataIngestion].[EntityPipeline] WHERE [IdPipeline] IN (SELECT [Id] FROM [DataIngestion].[Pipeline] WHERE [ItemId] = @extractPipelineItemId)
DELETE FROM [DataIngestion].[Pipeline] WHERE [ItemId] = @extractPipelineItemId

In case more items are added, they should be removed in the same way as well.

5. Add the new pipeline

For adding the new pipeline, it is required to use following Ids:

  • ItemId as a unique id for the pipeline in the system. This is the value calculated in step 3 as @extractPipelineItemId.
  • IdTemplate is the id of the template used, calculated in the step 1 as @extractPipelineTemplateId.
  • IdDataFactory is the Data Factory used to deploy this pipeline. This can be found in [DataIngestion].[DataFactory] table and normally in the common Client Application there is only one; so the value 1 will be used assuming that Id exists in the related table.
  • IdPipelineSyncBehaviour: This was described in this section and by default, a value of 1 will be assumed as the standard behaviour.
  • ExecutionParameters: The pipeline template used provides parameters to scale the database and to execute a stored procedure. Therefore these parameters should be provided:
1
2
3
4
5
{
    "storedProcedureName": "[myschema].[mystoredprocedure]",
    "scaleUpCapacity":"50", 
    "scaleUpTierName":"S2" 
}
1
2
3
4
5
6
7
INSERT INTO [DataIngestion].[Pipeline] ([ItemId],[Name],[ValidFrom],[ValidUntil],[IdTemplate],[LastUpdated],[LastDeployed],[IdDataFactory],[IsRemoved],[IdPipelineSyncBehaviour],[Parameters],[ExecutionParameters])
VALUES 
(@extractPipelineItemId, N'extract_example', GETUTCDATE(), NULL, @extractPipelineTemplateId, GETUTCDATE(), NULL, 1, 0, 1, N'{}', N'{
    "storedProcedureName": "[myschema].[mystoredprocedure]",
    "scaleUpCapacity":"50", 
    "scaleUpTierName":"S2" 
}')

6. Add Entity-pipeline relationship using a SQL script

In order to extract the data from the desired Entities in the DSU, it is required to associate the pipeline with them. In the following example let's assume for this pipeline that the Entities with Id 1,3 and 5 are the Entities required to extract data from.

First thing is to get the Id of the pipeline created:

1
2
3
4
5
6
DECLARE @extractPipelineId INT =
(
    SELECT [Id]
    FROM [DataIngestion].[Pipeline]
    WHERE [ItemId] = @extractPipelineItemId
)

Second, insert the relationship between the Entity and the Pipeline:

1
2
3
4
5
INSERT INTO [DataIngestion].[EntityPipeline]([IdEntity],[IdPipeline],[IsMandatory],[PipelineExecutionProperties])
VALUES
(1, @extractPipelineId, 1, NULL),
(3, @extractPipelineId, 1, NULL),
(5, @extractPipelineId, 1, NULL)

7. Complete the client script

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
DECLARE @extractPipelineTemplateId INT =
(
    SELECT [Id]
    FROM [DataIngestion].[PipelineTemplate]
    WHERE [ItemId] = 'BF367329-ABE2-42CD-BF4C-8461596B961C'
)

DECLARE @extractPipelineItemId UNIQUEIDENTIFIER = '6860B34B-A53F-42EF-8FD7-D2CAF5690CAF'

-- Cleanup
DELETE FROM [DataIngestion].[EntityPipeline] WHERE [IdPipeline] IN (SELECT [Id] FROM [DataIngestion].[Pipeline] WHERE [ItemId] = @extractPipelineItemId)
DELETE FROM [DataIngestion].[Pipeline] WHERE [ItemId] = @extractPipelineItemId

-- Script content
INSERT INTO [DataIngestion].[Pipeline] ([ItemId],[Name],[ValidFrom],[ValidUntil],[IdTemplate],[LastUpdated],[LastDeployed],[IdDataFactory],[IsRemoved],[IdPipelineSyncBehaviour],[Parameters],[ExecutionParameters])
VALUES 
(@extractPipelineItemId, N'extract_example', GETUTCDATE(), NULL, @extractPipelineTemplateId, GETUTCDATE(), NULL, 1, 0, 1, N'{}', N'{
"storedProcedureName": "[myschema].[mystoredprocedure]",
"scaleUpCapacity":"50", 
"scaleUpTierName":"S2" 
}')

DECLARE @extractPipelineId INT =
(
    SELECT [Id]
    FROM [DataIngestion].[Pipeline]
    WHERE [ItemId] = @extractPipelineItemId
)

INSERT INTO [DataIngestion].[EntityPipeline]([IdEntity],[IdPipeline],[IsMandatory],[PipelineExecutionProperties])
VALUES
(1, @extractPipelineId, 1, NULL),
(3, @extractPipelineId, 1, NULL),
(5, @extractPipelineId, 1, NULL)

8. Put your pipeline inside DatabaseBuilder

Add an SQL script following the naming conventions to the Scripts\ClientContext folder or to the place configured in the DatabaseBuilder from which to retrieve the scripts. Ensure that the script is working without errors and marked with "CopyAlways" to ensure that will be part of the result of the code compilation.

9. Push the changes into the branch

Raise a deployment if it is not configured in the current environment to be raised automatically and wait until the result. Database builder job will execute the SQL script to put the content in the Client Application database and then DataFactoryManager for client will update the content in the Client Applications database to the artifacts in the client Data Factory.

pt to put the content in the database and then DataFactoryManager for client will update the content in the database to the artifacts in Data Factory.