Skip to content

Data Domain

To enable the transformation and modelling of relational data according to business rules Sidra provides a template of Data Product, a Basic SQL Data Product.

The Data Domain Data Product includes Basic SQL and Databricks allowing for additional requirements. These additional requirements cater to more advanced scenarios, such as:

  • Scenarios where we need to allow custom data querying logic on the raw data lake from the Data Product.
  • Scenarios where we need advanced data aggregation and dependencies logic as views or new staging tables in the Data Product.

Sidra provides a built-in Data Product template as an extension of the basic SQL Data Product. This template incorporates a dedicated Databricks cluster for enabling this custom querying logic to create staging tables as e.g. new views.

The main components of this Data Product template are:

  • A client SQL database. This database hosts a reduced version of the metadata tables in Sidra Service (in order to track Assets metadata and ADF metadata), as well as the staging tables and stored procedures to create the final elaborated production tables.
  • A Databricks cluster that is deployed and configured with an orchestration notebook. This notebook orchestrates the required custom data processing logic, for example, to query for an Entity and its related Entities to create a staging table as an aggregated view, etc.

Purpose

This Data Product template allows to accelerate the creation of a Data Product, by abstracting from all the main synchronization elements with Sidra Service.

The Data Product with this template needs to be configured to have the required permissions to access the DSU data.

Once that is happening, the Data Product Sync job, explained here, is responsible to transparently synchronize the metadata between Sidra Service and the Data Product database. This job also triggers the Data Product pipeline defined to synchronize the actual data to the local Databricks cluster and to create the Staging tables.

The actual data flow orchestration is performed by a Data Product pipeline, via a specific instance of Azure Data Factory installed in the Data Product resource group. Related to this orchestrator activity, a new option has been included in the version 1.12 (2022.R2) onwards, aimed to manage when the stored procedure is empty, or create a new template without the orchestrator.

Summary

In summary, the Data Product pipeline performs the following actions:

  • Export the raw data from the DSU to the Data Product storage (raw storage container).
  • Copy the data to the Data Product Databricks as delta tables.
  • An orchestrator Databricks notebook executes the configured queries in StagingConfiguration to create the required staging tables with the needed views or aggregations.
  • Create views as new staging tables in the Data Product database.
  • An orchestrator SQL stored procedure executes business logic to create the final production tables in the Data Product database.
  • This Data Product integrated with Sidra shares the common security model with Sidra Service and uses Identity Server for authentication.

The end-to-end process looks like the following:

  • Thanks to the execution of the Sync webjob inside the Data Product, the ingested Assets metadata is synchronized with the Sidra Service Assets metadata.
  • A copy of the relevant actual data is stored in the Data Product storage in raw and in delta tables in the Data Product Databricks.
  • An orchestrator Databricks notebooks executes a set of configured queries to create the staging tables in the Data Product database.
  • Further SQL stored procedures create the final production tables for the relevant use case.

High-level installation details

As with any other type of Data Product in Sidra, the process of installing this Data Product consists of the following main steps:

  • A dotnet template is installed, which launches a build and release pipeline in Azure DevOps defined for the Data Labs Data Product.
  • As part of the build and release pipeline for this Data Product, the needed infrastructure is installed. This includes the execution of the Deploy.ps1 and Databricks deployment scripts, and also the different WebJobs deployment.

Build+Release is performed with multi-stage pipelines. Once the Data Product template is installed, you have to create the solution using that template, push the changes in Git and configure the build/release pipeline in DevOps to start the deployment.

For more information on these topics you can access this Documentation.

Step-by-step

Create a Data Product from scratch

For more information, check the specific tutorial for creating a Data Product .

Architecture

The Data Domain Data Product resources are contained into a couple of resource groups:

  • One resource group contains all the services used by the Data Product, separated from the Sidra Service and DSU resource groups:
  • The services included in the ARM template for this Data Product contain the following pieces:

    • Client storage account for raw data: used for storing the copy of the data that is extracted from the DSU, and for which the Data Product has access.
    • Client storage account for delta tables: used for storing the delta tables, used as external tables by Databricks. This is an ADLS Gen 2 account.
    • Client Data Factory: used for data orchestration pipelines to bring the data from the DSU, execute the Databricks orchestrator notebook, and copying the elaborated data to the staging tables.
    • Client Database: used for keeping a synchronized copy of the Assets metadata between Sidra Service and the Data Product, and for hosting the relational models and transformation queries and stored procedures.
    • Client Key Vault: used for storing and accessing secrets in a secure way.
  • One resource group is a resource group (managed resource group) that is created automatically with each Databricks service. The name of this resource group has as a prefix the same name of the above resource group, plus a suffix ending in -dbr. A number of resources are created inside this managed resource group, such as virtual machines, disks and network interfaces.

Basic-SQL-Databricks

Besides the Azure infrastructure, several Webjobs are also deployed for the Basic Data Product, responsible for the background tasks of data and metadata synchronization:

  • Sync webjob
  • DatabaseBuilder webjob
  • DatafactoryManager webjob

You can find more information about these jobs in this page.

Data Product pipelines

The pipeline template for extraction to storage and execute Databricks notebook with ItemId 202BDE4E-1465-4461-9A86-492DBFFF9312, can be used by the PI Data Product template.

This pipeline template performs the following steps:

  • Step 1: The list of Entities to query and export from the Data Lake (DSU) is retrieved. This list will also be used for creating the tables in the Data Product Databricks.
  • Step 2: If there are any Entities requiring to create tables in the Data Product Databricks, a notebook called Create Tables in the Data Product Databricks is executed.
  • Step 3: For each Entity to export, the data is first copied from the DSU into the Data Product storage (in raw format), through a call to /query Sidra Service API endpoint.
  • Step 4: For each Entity to export, the data is copied to the Data Product Databricks as delta tables (a notebook called LoadDataInDatabricks is executed for this data operation).
  • Step 5: An entry point notebook, or orchestration notebook, whose name is passed as a parameter to the pipeline, is executed. Here the user is able to add any custom querying logic for aggregating and correlating any data that has been made available as Databricks delta tables. The queries to execute in this notebook will be read from the configured data on the StagingConfiguration Data Product table.
  • Step 6: Another Databricks notebook, called ExtractToStagingTables, is executed to load the data into the SQL staging tables, therefore making it available for further processing.
  • Step 7: The orchestrator SQL stored procedure is called. This stored procedure executes business logic to read from the staging tables and create the final production data models.
  • Step 8: Finally, the Asset status is changed to ImportedFromDataStorageUnit, or status 3, meaning that the Asset has been imported from the Data Storage Unit into the Data Product database.

The parameters for using this pipeline are:

  • storedProcedureName: The SQL stored procedure that is going to be invoked after all the Entities have dropped the content into the staging tables. This stored procedure will need to be created before deploying a pipeline from this pipeline template. The default value is [Staging].[Orchestrator].
  • notebookOrchestrator: The path of the Notebook to execute the configured queries in the StagingConfiguration table . The Notebook should be previously created and uploaded into the Databricks instance.

For example, the ExecutionParameters section will be:

{
    "notebookOrchestrator": "/Shared/MyNotebook",
    "storedProcedureName": "[staging].[Orchestrator]" 
}

New Data Product tables

StagingConfiguration table

Additionally, this Data Product contains a new table, called StagingConfiguration, which contains the configuration of the new table names in Staging that need to be created, as well as the queries to create these new tables.

Column Description
IdPipeline Id of the pipeline for the extraction of the data from the DSU to the Staging tables (Mandatory).
Query Query that defines the way to retrieve the data from the Databricks of the Data Product. If it is not defined, an IdEntity must be specified to retrieve the content by a default query. IdEntity and Query cannot be defined both as NULL at the same time.
IdEntity Entity identifier to retrieve the data from the Databricks of the Data Product. By default, it retrieves all Attributes of the Entity. If it is not defined, a Query must be specified to know the way to retrieve the data. IdEntity and Query cannot be both NULL at the same time.
SchemaName Schema name used for the staging tables. By default, Staging schema is used.
TableName Table name for the staging table. If it is not defined, a combination with the name of the Provider and the table name of the Entity should be used.
TableFieldDefinition Table field definition for the staging table, e.g., [SalesOrderID] [int] NULL,[OrderDate] [datetime] NULL,[SalesOrderNumber] [nvarchar](25) UnitPrice] [money] NULL. If it is not defined, the whole list of Attributes of the Entity is used. TableFieldDefinition and IdEntity cannot be defined as both NULL at the same time.

The StagingConfiguration table is needed in order to parametrize these queries in the Databricks notebook.

AssetTimeTravel table

Also, there is another table, called AssetTimeTravel. This table is populated by Sidra with the information of Time Travel provided by Client Databricks when the data of the Assets is inserted in Client Databricks. Please check Databricks documentation for more information on the Time Travel functionality.

Column Description
Id Id of the Asset where Time Travel definition applies.
VersionNumber Version number of the Time Travel given by Databricks.
Timestamp Timestamp of the Time Travel given by Databricks.

Step-by-step

How to handle dependant Entities through Databricks app

For a detailed example on how to use these tables and the general functioning of the PI Data Product, please check this tutorial .

How to configure the queries to create custom staging tables

The PI Data Product uses the StagingConfiguration table to store the queries that are to be run on the Databricks of the Data Product. When it comes to configuring the queries for creating the staging tables, we can find two different scenarios:

Scenario 1. Create a staging table for an Entity using the default configuration

In this first scenario, we use the default configuration of the StagingConfiguration table to configure the creation of the staging tables: By reviewing the above reference table that describes each field, we see that we just need to configure the IdEntity field:

INSERT INTO [Sidra].[StagingConfiguration] ([IdPipeline], [Query], [IdEntity], [SchemaName], [TableName], [TableFieldDefinition]) VALUES
(@IdPipelineDSUExtraction, NULL, @IdEntity, NULL, NULL, NULL)
As you will notice, the Query field is NULL, which means that the default query is executed. This default query just retrieves all the Attributes of the given Entity in the IdEntity field.

When the default query is executed by the underlying process in the Data Product, the result is the following:

  • A staging table is created, with Staging as schema. The table name is a combination of the Provider name and Entity name, e.g. [Staging].[Support_support_SalesLT_Product]
  • This Staging table is populated with data, by executing this query.

Below is an example of a generic query that the underlying process as triggered by the Data Product executes:

SELECT ProductID,Name,LoadDate,SidraIdAsset 
FROM (
SELECT ProductID,Name,LoadDate,SidraIdAsset FROM support.support_SalesLT_Product TIMESTAMP AS OF "<<TIMESTAMP.support.support_SalesLT_Product>>"
) A WHERE A.SidraIdAsset IN (<<ASSETID.support.support_SalesLT_Product>>)

, where:

  • <<TIMESTAMP.??>> and <<ASSETID.?? are the placeholders that should be filled in by the process, using the list of the Assets that the pipeline is loading.
  • <<TIMESTAMP.support.support_SalesLT_Product>>: The process locates the timestamp for the most recent Asset represented by the Provider Support and the table name support_SalesLT_Product (configured Entity), to ensure that the data is up to date.
  • <<ASSETID.support.support_SalesLT_Product>>: The process substitutes this with all Asset identifiers for the Provider Support, and the table name support_SalesLT_Product, to filter the data that is loaded by the pipeline.

Once processed, the above query would be translated to this query:

SELECT ProductID,Name,LoadDate,SidraIdAsset 
FROM (
SELECT ProductID,Name,LoadDate,SidraIdAsset FROM support.support_SalesLT_Product TIMESTAMP AS OF "2021-12-02 11:38:42.0000000"
) A WHERE A.SidraIdAsset IN (1399,1402,1409,1416)

Case 2. Create a custom staging table using the combination of two Entities

In the second scenario, we are not using the default configuration mode for the StagingConfiguration table. This means that we need to populate the following fields: Query, TableName and TableFieldDefinition.

INSERT INTO [Sidra].[StagingConfiguration] ([IdPipeline], [Query], [IdEntity], [SchemaName], [TableName], [TableFieldDefinition]) VALUES
, (@IdPipelineDSUExtraction
, 'SELECT H.SalesOrderId, H.OrderDate, H.SalesOrderNumber, H.ShipMethod, H.SubTotal, D.SalesOrderDetailId, D.OrderQty, D.UnitPrice, D.LineTotal
   FROM (SELECT * FROM support.support_SalesLT_SalesOrderHeader TIMESTAMP AS OF "<<TIMESTAMP.support.support_SalesLT_SalesOrderHeader>>") H
   LEFT JOIN (SELECT * FROM support.support_SalesLT_SalesOrderDetail TIMESTAMP AS OF "<<TIMESTAMP.support.support_SalesLT_SalesOrderDetail>>") D ON H.SalesOrderId = D.SalesOrderId 
   WHERE H.SidraIdAsset IN (<<ASSETID.support.support_SalesLT_SalesOrderHeader>>)'
, NULL 
, NULL
, 'support_SalesLT_SalesOrderHeaderWithDetails'
, '[SalesOrderID] [int] NULL,[OrderDate] [datetime] NULL,[SalesOrderNumber] [nvarchar](25) NULL,[ShipMethod] [nvarchar](50) NULL,[SubTotal] [money] NULL,[SalesOrderDetailID] [int] NULL,[OrderQty] [smallint] NULL,[UnitPrice] [money] NULL,[LineTotal] [numeric](18, 0) NULL')

When this query is executed by the underlying process in the Data Product, the result is the following:

  • A staging table is created, with Staging as a schema (default when the value SchemaName is not specified). The table name is specified by the value in TableName, and the columns for the table are defined by the TableFieldDefinition column.
  • The staging table is populated with data, by executing the query defined in the Query column:
SELECT H.SalesOrderId, H.OrderDate, H.SalesOrderNumber, H.ShipMethod, H.SubTotal, D.SalesOrderDetailId, D.OrderQty, D.UnitPrice, D.LineTotal 
FROM (SELECT * FROM support.support_SalesLT_SalesOrderHeader TIMESTAMP AS OF "<<TIMESTAMP.support.support_SalesLT_SalesOrderHeader>>") H 
LEFT JOIN (SELECT * FROM support.support_SalesLT_SalesOrderDetail TIMESTAMP AS OF "<<TIMESTAMP.support.support_SalesLT_SalesOrderDetail>>") D ON H.SalesOrderId = D.SalesOrderId 
WHERE H.SidraIdAsset IN (<<ASSETID.support.support_SalesLT_SalesOrderHeader>>)

, where:

  • <<TIMESTAMP.??>> and <<ASSETID.??>> are placeholders that should be filled by the process, according the Assets that the pipeline is loading.
  • For the creation of custom queries the placeholders available are: <<TIMESTAMP.{provider-databaseName}.{entity-tableName}>> <<ASSETID.{provider-databaseName}.{entity-tableName}>>

Last update: 2024-02-23