Skip to content

Stored Procedure for Data Products

For loading the data into an SQL database - which is inside of the Data Product solution -, for example, and adding the required business logic to generate the production tables, an orchestrator stored procedure will be necessary. This can be created in the staging schema with the following structure and completed depending on the Data Product logic.

CREATE PROCEDURE [staging].[orchestrator]
    @pipelineRunId NVARCHAR(100)
AS
    SELECT 1
    -- TODO: POPULATE WITH LOGIC
RETURN 0
GO

Example of Stored Procedure

The following code is just an example depicting the structure of the orchestrator stored procedure to use in the Data Product.

CREATE PROCEDURE [staging].[orchestrator] (@pipelineRunId NVARCHAR(100))
AS
BEGIN
    DECLARE @IdPipeline INT = (SELECT TOP 1 IdPipeline FROM [Sidra].[ExtractPipelineExecution] WHERE [PipelineRunId] = @pipelineRunId)
    DECLARE @IdLoad INT;
    EXECUTE @IdLoad = [Sidra].[LoadProcessLog] 1, @IdPipeline, NULL, @pipelineRunId;    
    DECLARE @AssetOrder INT = 1;
    DECLARE @MaxAssetOrder INT;
    SELECT @MaxAssetOrder = COALESCE(MAX([AssetOrder]), 0) FROM [Sidra].[ExtractPipelineExecution] WHERE [PipelineRunId] = @pipelineRunId
    BEGIN TRY
        IF @MaxAssetOrder > 0
        BEGIN
            WHILE (@AssetOrder <= @MaxAssetOrder)
            BEGIN
                    --- YOUR CODE HERE
                    --- YOUR CODE HERE
                    --- YOUR CODE HERE

                SET @AssetOrder=@AssetOrder + 1
            END
            EXECUTE [Sidra].[LoadProcessLog] 2, @IdPipeline, @IdLoad, @pipelineRunId;
            EXECUTE [Sidra].[UpdateAssetLoadStatus] @pipelineRunId, 101;
        END
        IF @MaxAssetOrder = 0
        BEGIN
            EXECUTE [Sidra].[LoadProcessLog] 3, @IdPipeline, @IdLoad, @pipelineRunId;
        END
    END TRY
    BEGIN CATCH
        DECLARE @Message nvarchar(4000) = ERROR_MESSAGE() 
        DECLARE @ErrorSeverity INT = ERROR_SEVERITY();  
        DECLARE @ErrorState INT = ERROR_STATE();
        EXECUTE [Sidra].[LoadProcessLog] 0, @IdPipeline, @IdLoad, @pipelineRunId;
        EXECUTE [Sidra].[UpdateAssetLoadStatus] @pipelineRunId, 102;
        RAISERROR (@Message, @ErrorSeverity, @ErrorState);   
    END CATCH
END

Stored Procedure Templates

There are several generic stored procedures provided by Sidra to ease the custom data extraction from the Data Product Databricks tables or raw storage -depending on the business logic-, to the Data Product database tables or production/model final tables:

  • CreateStagingTable. This stored procedure is responsible for selecting the Attributes data from the Entities of the Data Product Databricks or raw storage, doing some conversions depending on the data type -as sanitize certain characters- and creating the staging tables, group by Entity, in the Data Product database.

  • GetNextPendingIdAsset. This stored procedure, when executed through the pipeline template deployment, is in charge of detecting and loading the new Assets to the staging tables in the Data Product database.

  • UpsertExtractPipelineExecution. In this case, this store procedure will maintain the ExtractPipelineExecution tracking table updated via inserting new registers (Asset data) when they do not exist, or updating the current ones when changes have made. Then, the status of the Data Product pipeline execution will be updated with the values detailed in its table.