Stored Procedure for Data Products¶
For loading the data into an SQL database - which is inside of the Data Product solution -, for example, and adding the required business logic to generate the production tables, an orchestrator stored procedure will be necessary. This can be created in the staging schema with the following structure and completed depending on the Data Product logic.
CREATE PROCEDURE [staging].[orchestrator]
@pipelineRunId NVARCHAR(100)
AS
SELECT 1
-- TODO: POPULATE WITH LOGIC
RETURN 0
GO
Example of Stored Procedure¶
The following code is just an example depicting the structure of the orchestrator stored procedure to use in the Data Product.
CREATE PROCEDURE [staging].[orchestrator] (@pipelineRunId NVARCHAR(100))
AS
BEGIN
DECLARE @IdPipeline INT = (SELECT TOP 1 IdPipeline FROM [Sidra].[ExtractPipelineExecution] WHERE [PipelineRunId] = @pipelineRunId)
DECLARE @IdLoad INT;
EXECUTE @IdLoad = [Sidra].[LoadProcessLog] 1, @IdPipeline, NULL, @pipelineRunId;
DECLARE @AssetOrder INT = 1;
DECLARE @MaxAssetOrder INT;
SELECT @MaxAssetOrder = COALESCE(MAX([AssetOrder]), 0) FROM [Sidra].[ExtractPipelineExecution] WHERE [PipelineRunId] = @pipelineRunId
BEGIN TRY
IF @MaxAssetOrder > 0
BEGIN
WHILE (@AssetOrder <= @MaxAssetOrder)
BEGIN
--- YOUR CODE HERE
--- YOUR CODE HERE
--- YOUR CODE HERE
SET @AssetOrder=@AssetOrder + 1
END
EXECUTE [Sidra].[LoadProcessLog] 2, @IdPipeline, @IdLoad, @pipelineRunId;
EXECUTE [Sidra].[UpdateAssetLoadStatus] @pipelineRunId, 101;
END
IF @MaxAssetOrder = 0
BEGIN
EXECUTE [Sidra].[LoadProcessLog] 3, @IdPipeline, @IdLoad, @pipelineRunId;
END
END TRY
BEGIN CATCH
DECLARE @Message nvarchar(4000) = ERROR_MESSAGE()
DECLARE @ErrorSeverity INT = ERROR_SEVERITY();
DECLARE @ErrorState INT = ERROR_STATE();
EXECUTE [Sidra].[LoadProcessLog] 0, @IdPipeline, @IdLoad, @pipelineRunId;
EXECUTE [Sidra].[UpdateAssetLoadStatus] @pipelineRunId, 102;
RAISERROR (@Message, @ErrorSeverity, @ErrorState);
END CATCH
END
Stored Procedure Templates¶
There are several generic stored procedures provided by Sidra to ease the custom data extraction from the Data Product Databricks tables or raw storage -depending on the business logic-, to the Data Product database tables or production/model final tables:
-
CreateStagingTable
. This stored procedure is responsible for selecting the Attributes data from the Entities of the Data Product Databricks or raw storage, doing some conversions depending on the data type -as sanitize certain characters- and creating the staging tables, group by Entity, in the Data Product database. -
GetNextPendingIdAsset
. This stored procedure, when executed through the pipeline template deployment, is in charge of detecting and loading the new Assets to the staging tables in the Data Product database. -
UpsertExtractPipelineExecution
. In this case, this store procedure will maintain theExtractPipelineExecution
tracking table updated via inserting new registers (Asset data) when they do not exist, or updating the current ones when changes have made. Then, the status of the Data Product pipeline execution will be updated with the values detailed in its table.