Data Intake Process of CSV files from Sharepoint online library connector¶
When bringing data in CSV format from a SharePoint source into Databricks, with Sidra, there are 2 stages:
- Intake: Copy the CSV files from the SharePoint collection to the Staging area of Sidra.
- Ingestion: Have the Sidra transfer the data from CSV format, from the Staging area, into its final Databricks table(s).
In this tutorial, we cover:
- Step 1. Intake: setting up the SharePoint Connector Plugin to copy the files from SharePoint to the Sidra's
Stage
Azure Storage Account, under itslanding
container, by following the Data Intake Process wizard in Sidra web. - Step 2. Ingestion: offering some helper scripts to configure the metadata for enabling the final data ingestion in Databricks. This Step 2 is documented in CSV File Data Intake Process.
The Step 2, mainly consists in declaring the metadata, the description of data from the CSV files. Sidra needs the metadata to transfer the records in Databricks. This Step 2 is documented in CSV File Data Intake Process.
The ingestion stage from Step 2 expects that CSV files will be placed as blobs:
- In a
/<Provider-name>/<Entity-name>/
folder under thelanding
container of the Sidra'sStage
Azure Storage Account; - With names matching the
[RegularExpression]
field of the Entity metadata entry; - With name or path denoting the date of the Asset, the records set that is to be ingested.
Considering the above, the SharePoint plugin will not configure a complete Data Intake Process, but rather just a data extraction: a copy Data Factory pipeline from the SharePoint source system into Sidra's Stage
area, in the needed landing
container with the expected blob path and name.
Make sure that the name of the SharePoint library does not contain any spaces before start de Data Intake Process.
Setup¶
Step 1. Sidra Web Manager¶
-
Follow the SharePoint Online Library Intake Process while taking note of the Provider Name and Entity Name(s).
-
Complete the
Configure Data Source
option with the configuration fields required. Make sure that the optionDefault
inIntegration Runtime
field has upper case in the first letter.Info
Bear in mind the following:
Service Principal ID
andService Principal Key
correspond toClient ID
andClient Secret
in this guide about how to get them.- Following the guide from the official documentation, once you have created the App permissions, you will be redirected to a "Site Collection App Permissions" page where you can identify your Tenant ID easily (coloured in the image below).
-
In the Configure Metadata Extractor tab, select
Other existing container
as theDestination Path
, and inPlease specify a destination path
dropdown selectlanding
. Finally, specify your Entity Name(s) in the Entity list, selecting “.csv” in the format. TheSource Path
is the folder created in the SharePoint directory. Note that the field Filter Expression is intended to select certain set of files to extract and to associate to each Entity. -
Configure the trigger.
-
Confirm the process. The connection configuration will be tested a few seconds. After testing, a confirmation message will appear, with the success or error details.
Step 2. Define the ingestion metadata¶
The main purpose of metadata definition is to let Sidra know what kind of fields it should expect in the CSV files, with what separator and what data types. The metadata definition is more comprehensively illustrated in CSV File Data Intake Process; below there is a quick-starter:
-
Run the script below for each Entity you have specified in the SharePoint Intake.
-
Replace the
@Provider_Name
with your Provider and the@Entity_Name
with your Entity.
You may need to review the field delimiter, the RegEx according to your file naming convention, etc. -
In the
#Entity_Attributes
temporary table, specify the columns of your CSV file, with the data type.
The following script is based on the CSV Intake Script in Sidra Docs. You can download the full SQL script here and the CSV file here.
/**** This Script is run after the Creation of the Sharepoint Doc Library Ingestion in the Sidra Web UI
****/
DECLARE @Provider_Name VARCHAR(30) = 'JohnDoeSPLibPlusFileProvider03'
DECLARE @Provider_Owner NVARCHAR(256) = N'[email protected]'
DECLARE @Provider_Description NVARCHAR(MAX) = N'Intake a Comma-Separated values file.'
DECLARE @SidraInstallation VARCHAR(4) = 'Dev';
DECLARE @Dsu_ResourceGroupName VARCHAR(50) = CONCAT('Sidra.DSU.Default.',@SidraInstallation)
DECLARE @Entity_Name VARCHAR(MAX) = 'JohnDoeFunBikes'
DECLARE @Entity_RegularExpression NVARCHAR(MAX) = '^JohnDoe_((?<year>\d{4})-(?<month>\d{2})-(?<day>\d{2}))_((?<hour>\d{2})-(?<minute>\d{2})-(?<second>\d{2})).csv'
DECLARE @Entity_Delimiter VARCHAR(6) = '\u0009' -- => TAB char, '\t' --Others: '\u002C'=','
DECLARE @Entity_Encoding VARCHAR(50) = 'UTF-8'
DECLARE @Entity_HeaderLines INT = 1
DECLARE @Entity_Format VARCHAR(10) = 'csv'
Declare @IdTableFormat int = 3
Declare @StartValidDate Date = '2022-03-01'
DROP TABLE IF EXISTS #Entity_Attributes
CREATE TABLE #Entity_Attributes ([Order] INT, [Name] VARCHAR(150), [HiveType] VARCHAR(30), [SqlType] VARCHAR(30))
INSERT INTO #Entity_Attributes ([Order], [Name], [HiveType], [SqlType])
VALUES
(1, 'product_id', 'INT', 'INT')
,(2, 'product_name', 'STRING', 'NVARCHAR(128)')
,(3, 'brand_name', 'STRING', 'NVARCHAR(32)')
,(4, 'category_name', 'STRING', 'NVARCHAR(64)')
,(5, 'model_year', 'INT', 'SMALLINT')
,(6, 'list_price', 'DECIMAL(10, 2)', 'DECIMAL(10, 2)')
-------------------------------------------------------------------------------
SET NOCOUNT ON;
BEGIN TRAN CreateEntity
DECLARE @CurrentDate DATETIME2 = GETUTCDATE()
DECLARE @Pipeline_ItemId UNIQUEIDENTIFIER =
(SELECT TOP(1) P.[ItemId] FROM [DataIngestion].[Pipeline] P WHERE [Name] = 'FileIngestionDatabricks' order by P.ItemId desc)
-- This should be: F8CD4AD9-DCAE-4463-9185-58292D14BE99
print(@Pipeline_ItemId)
DECLARE @Pipeline_Id INT = (SELECT TOP (1) P.[Id] FROM [DataIngestion].[Pipeline] P WHERE P.[ItemId] = @Pipeline_ItemId)
DECLARE @Entity_Properties VARCHAR(MAX) = CONCAT('{"sourceTableName": "[staging].[', @Entity_Name, ']","readerOptions": {"delimiter": "',@Entity_Delimiter,'","header": ',@Entity_HeaderLineString,',"encoding": "',@Entity_Encoding,'"}}');
DECLARE @Provider_DatabaseName VARCHAR(MAX) = LOWER(@Provider_Name)
DECLARE @Dsu_Id INT
DECLARE @Dsu_SecurityPath VARCHAR(10)
SELECT
@Dsu_Id = [Id]
,@Dsu_SecurityPath = [SecurityPath]
FROM [DataIngestion].[DataStorageUnit]
WHERE [ResourceGroupName] = @Dsu_ResourceGroupName
-------------------------------------------------------------------------------
-- Get Provider
-------------------------------------------------------------------------------
DECLARE @Provider_Id INT = (SELECT TOP 1 [Id] FROM [DataIngestion].[Provider] WHERE [ProviderName] = @Provider_Name)
print(@Provider_Id)
-------------------------------------------------------------------------------
-- Get and Update Entity
-------------------------------------------------------------------------------
BEGIN
DECLARE @Entity_Id INT = (SELECT TOP 1 Id FROM [DataIngestion].[Entity] WHERE [Name] = @Entity_Name)
print(@Entity_Id)
--DECLARE @Entity_ParentSecurityPath VARCHAR(100) = CONCAT(@Dsu_Id, '/', @Provider_Id)
Update DataIngestion.Entity Set RegularExpression = @Entity_RegularExpression, [Encoding] = @Entity_Encoding, HeaderLines = @Entity_HeaderLines, FieldDelimiter = @Entity_Delimiter, [Format] = @Entity_Format, IdTableFormat = @IdTableFormat, StartValidDate = @StartValidDate Where Id = @Entity_Id
END;
-------------------------------------------------------------------------------
-- Create Attribute
-------------------------------------------------------------------------------
BEGIN
DECLARE @Attribute_ParentSecurityPath VARCHAR(100) = CONCAT(@Dsu_Id,'/',@Provider_Id,'/',@Entity_Id)
--DELETE FROM [DataIngestion].[Attribute] WHERE [IdEntity] = @Entity_Id
INSERT INTO [DataIngestion].[Attribute]
([IdEntity],[Name],[HiveType],[MaxLen],[IsNullable],[NeedTrim],[RemoveQuotes],[ReplacedText],[ReplacementText],[SpecialFormat],[TreatEmptyAsNull],[IsPrimaryKey],[Order]
,[IsCalculated],[IsPartitionColumn],[IsMetadata],[SQLType],[ValidationText],[Description],[SourceName],[ParentSecurityPath],[IsEncrypted],[DataMask],[ItemId])
SELECT @Entity_Id, [Name], [HiveType], NULL, 0, 0, 0, NULL, NULL, NULL, 0, 0, [Order], 0, 0, 0, [SqlType], NULL, NULL, [Name], @Attribute_ParentSecurityPath, 0, NULL, NEWID()
FROM #Entity_Attributes
INSERT INTO [DataIngestion].[Attribute]
([IdEntity],[Name],[HiveType],[MaxLen],[IsNullable],[NeedTrim],[RemoveQuotes],[ReplacedText],[ReplacementText],[SpecialFormat],[TreatEmptyAsNull],[IsPrimaryKey],[Order]
,[IsCalculated],[IsPartitionColumn],[IsMetadata],[SQLType],[ValidationText],[Description],[SourceName],[ParentSecurityPath],[IsEncrypted],[DataMask],[ItemId])
VALUES
(@Entity_Id, 'LoadDate', 'STRING', NULL, 0, 0, 0, NULL, NULL, 'FROM_UNIXTIME(UNIX_TIMESTAMP())', 0, 0, 500, 1, 0, 1, 'datetime2(7)', NULL, NULL, NULL, @Attribute_ParentSecurityPath, 0, NULL, NEWID())
,(@Entity_Id, 'HasErrors', 'BOOLEAN', NULL, 0, 0, 0, NULL, NULL, 'FALSE', 0, 0, 501, 1, 0, 1, 'bit', NULL, NULL, NULL, @Attribute_ParentSecurityPath, 0, NULL, NEWID())
,(@Entity_Id, 'SidraIsDeleted', 'BOOLEAN', NULL, 0, 0, 0, NULL, NULL, 'SidraIsDeleted', 0, 0, 502, 1, 0, 1, 'bit', NULL, NULL, NULL, @Attribute_ParentSecurityPath, 0, NULL, NEWID())
,(@Entity_Id, 'IdSourceItem', 'INT', NULL, 0, 0, 0, NULL, NULL, 'IdSourceItem', 0, 0, 503, 1, 1, 1, 'int', NULL, NULL, NULL, @Attribute_ParentSecurityPath, 0, NULL, NEWID())
END;
-------------------------------------------------------------------------------
-- Relate with pipeline
-------------------------------------------------------------------------------
BEGIN
INSERT INTO [DataIngestion].[EntityPipeline] ([IdEntity], [IdPipeline])
SELECT [IdEntity], [IdPipeline]
FROM
(
SELECT @Entity_Id [IdEntity], @Pipeline_Id [IdPipeline]
EXCEPT
SELECT [IdEntity], [IdPipeline] FROM [DataIngestion].[EntityPipeline]
) AS x
END;
-------------------------------------------------------------------------------
-- Check results
-------------------------------------------------------------------------------
SET NOCOUNT OFF;
PRINT('-------------------------------------------------------------------------------')
SELECT * FROM [DataIngestion].[Provider] WHERE [ProviderName] = @Provider_Name
SELECT * FROM [DataIngestion].[Entity] WHERE [Name] = @Entity_Name
SELECT * FROM [DataIngestion].[Attribute] WHERE [IdEntity] = @Entity_Id
SELECT * FROM [DataIngestion].[EntityPipeline] WHERE [IdEntity] = @Entity_Id
SELECT * FROM [DataIngestion].[Asset] a WHERE [IdEntity] = @Entity_Id
COMMIT TRAN CreateEntity