Skip to content

Data Intake Process of CSV files from Sharepoint online library connector

When bringing data in CSV format from a SharePoint source into Databricks, with Sidra, there are 2 stages:

  1. Intake: Copy the CSV files from the SharePoint collection to the Staging area of Sidra.
  2. Ingestion: Have the Sidra transfer the data from CSV format, from the Staging area, into its final Databricks table(s).

In this tutorial, we cover:

  • Step 1. Intake: setting up the SharePoint Connector Plugin to copy the files from SharePoint to the Sidra's Stage Azure Storage Account, under its landing container, by following the Data Intake Process wizard in Sidra web.
  • Step 2. Ingestion: offering some helper scripts to configure the metadata for enabling the final data ingestion in Databricks. This Step 2 is documented in CSV File Data Intake Process.

The Step 2, mainly consists in declaring the metadata, the description of data from the CSV files. Sidra needs the metadata to transfer the records in Databricks. This Step 2 is documented in CSV File Data Intake Process.

The ingestion stage from Step 2 expects that CSV files will be placed as blobs:

  • In a /<Provider-name>/<Entity-name>/ folder under the landing container of the Sidra's Stage Azure Storage Account;
  • With names matching the [RegularExpression] field of the Entity metadata entry;
  • With name or path denoting the date of the Asset, the records set that is to be ingested.

Considering the above, the SharePoint plugin will not configure a complete Data Intake Process, but rather just a data extraction: a copy Data Factory pipeline from the SharePoint source system into Sidra's Stage area, in the needed landing container with the expected blob path and name.

SharePoint Document Library

SharePoint selection of connector

Sidra ingestion 1

Sidra ingestion 2

Make sure that the name of the SharePoint library does not contain any spaces before start de Data Intake Process.

Setup

Step 1. Sidra Web Manager

  1. Follow the SharePoint Online Library Intake Process while taking note of the Provider Name and Entity Name(s).

    Data Intake Options field

    UI Sidra - Provider and Entity

  2. Complete the Configure Data Source option with the configuration fields required. Make sure that the option Default in Integration Runtime field has upper case in the first letter.

    Configure Data Source field

    Info

    Bear in mind the following:

    • Service Principal ID and Service Principal Key correspond to Client ID and Client Secret in this guide about how to get them.
    • Following the guide from the official documentation, once you have created the App permissions, you will be redirected to a "Site Collection App Permissions" page where you can identify your Tenant ID easily (coloured in the image below). Data Intake Options field
  3. In the Configure Metadata Extractor tab, select Other existing container as the Destination Path, and in Please specify a destination path dropdown select landing. Finally, specify your Entity Name(s) in the Entity list, selecting “.csv” in the format. The Source Path is the folder created in the SharePoint directory. Note that the field Filter Expression is intended to select certain set of files to extract and to associate to each Entity.

    Configure metadata extractor

  4. Configure the trigger.

    Configure trigger

  5. Confirm the process. The connection configuration will be tested a few seconds. After testing, a confirmation message will appear, with the success or error details.

    Configure metadata extractor

Step 2. Define the ingestion metadata

The main purpose of metadata definition is to let Sidra know what kind of fields it should expect in the CSV files, with what separator and what data types. The metadata definition is more comprehensively illustrated in CSV File Data Intake Process; below there is a quick-starter:

  1. Run the script below for each Entity you have specified in the SharePoint Intake.

  2. Replace the @Provider_Name with your Provider and the @Entity_Name with your Entity.
    You may need to review the field delimiter, the RegEx according to your file naming convention, etc.

  3. In the #Entity_Attributes temporary table, specify the columns of your CSV file, with the data type.

The following script is based on the CSV Intake Script in Sidra Docs. You can download the full SQL script here and the CSV file here.

/**** This Script is run after the Creation of the Sharepoint Doc Library Ingestion in the Sidra Web UI
****/
DECLARE @Provider_Name VARCHAR(30) = 'JohnDoeSPLibPlusFileProvider03'
DECLARE @Provider_Owner NVARCHAR(256) = N'[email protected]'
DECLARE @Provider_Description NVARCHAR(MAX) = N'Intake a Comma-Separated values file.'

DECLARE @SidraInstallation VARCHAR(4) = 'Dev';
DECLARE @Dsu_ResourceGroupName VARCHAR(50) = CONCAT('Sidra.DSU.Default.',@SidraInstallation)

DECLARE @Entity_Name VARCHAR(MAX) = 'JohnDoeFunBikes'
DECLARE @Entity_RegularExpression NVARCHAR(MAX) = '^JohnDoe_((?<year>\d{4})-(?<month>\d{2})-(?<day>\d{2}))_((?<hour>\d{2})-(?<minute>\d{2})-(?<second>\d{2})).csv'
DECLARE @Entity_Delimiter VARCHAR(6) = '\u0009' -- => TAB char, '\t' --Others: '\u002C'=','
DECLARE @Entity_Encoding VARCHAR(50) = 'UTF-8'
DECLARE @Entity_HeaderLines INT = 1
DECLARE @Entity_Format VARCHAR(10) = 'csv'
Declare @IdTableFormat int = 3
Declare @StartValidDate Date = '2022-03-01'

DROP TABLE IF EXISTS #Entity_Attributes
CREATE TABLE #Entity_Attributes ([Order] INT, [Name] VARCHAR(150), [HiveType] VARCHAR(30), [SqlType] VARCHAR(30))
INSERT INTO #Entity_Attributes ([Order], [Name], [HiveType], [SqlType])
VALUES
     (1,    'product_id',       'INT',              'INT')
    ,(2,    'product_name',     'STRING',           'NVARCHAR(128)')
    ,(3,    'brand_name',       'STRING',           'NVARCHAR(32)')
    ,(4,    'category_name',    'STRING',           'NVARCHAR(64)')
    ,(5,    'model_year',       'INT',              'SMALLINT')
    ,(6,    'list_price',       'DECIMAL(10, 2)',   'DECIMAL(10, 2)')

-------------------------------------------------------------------------------

SET NOCOUNT ON;
BEGIN TRAN CreateEntity

DECLARE @CurrentDate DATETIME2 = GETUTCDATE()

DECLARE @Pipeline_ItemId UNIQUEIDENTIFIER = 
    (SELECT TOP(1) P.[ItemId] FROM [DataIngestion].[Pipeline] P WHERE [Name] = 'FileIngestionDatabricks' order by P.ItemId desc)
    -- This should be: F8CD4AD9-DCAE-4463-9185-58292D14BE99
print(@Pipeline_ItemId)
DECLARE @Pipeline_Id INT = (SELECT TOP (1) P.[Id] FROM [DataIngestion].[Pipeline] P WHERE P.[ItemId] = @Pipeline_ItemId)

DECLARE @Entity_Properties VARCHAR(MAX) = CONCAT('{"sourceTableName": "[staging].[', @Entity_Name, ']","readerOptions": {"delimiter": "',@Entity_Delimiter,'","header": ',@Entity_HeaderLineString,',"encoding": "',@Entity_Encoding,'"}}');
DECLARE @Provider_DatabaseName VARCHAR(MAX) = LOWER(@Provider_Name)

DECLARE @Dsu_Id INT
DECLARE @Dsu_SecurityPath VARCHAR(10)

SELECT 
    @Dsu_Id = [Id] 
    ,@Dsu_SecurityPath = [SecurityPath]
FROM [DataIngestion].[DataStorageUnit] 
WHERE [ResourceGroupName] = @Dsu_ResourceGroupName

-------------------------------------------------------------------------------
-- Get Provider
-------------------------------------------------------------------------------
DECLARE @Provider_Id INT = (SELECT TOP 1 [Id] FROM [DataIngestion].[Provider] WHERE [ProviderName] = @Provider_Name)
print(@Provider_Id)


-------------------------------------------------------------------------------
-- Get and Update Entity
-------------------------------------------------------------------------------
BEGIN

DECLARE @Entity_Id INT = (SELECT TOP 1 Id FROM [DataIngestion].[Entity] WHERE [Name] = @Entity_Name)
print(@Entity_Id)

--DECLARE @Entity_ParentSecurityPath VARCHAR(100) = CONCAT(@Dsu_Id, '/', @Provider_Id)

Update DataIngestion.Entity Set RegularExpression = @Entity_RegularExpression, [Encoding] = @Entity_Encoding, HeaderLines = @Entity_HeaderLines, FieldDelimiter = @Entity_Delimiter, [Format] = @Entity_Format, IdTableFormat = @IdTableFormat, StartValidDate = @StartValidDate  Where Id = @Entity_Id

END;

-------------------------------------------------------------------------------
-- Create Attribute
-------------------------------------------------------------------------------
BEGIN

DECLARE @Attribute_ParentSecurityPath VARCHAR(100) = CONCAT(@Dsu_Id,'/',@Provider_Id,'/',@Entity_Id)

--DELETE FROM [DataIngestion].[Attribute] WHERE [IdEntity] = @Entity_Id

INSERT INTO [DataIngestion].[Attribute]
    ([IdEntity],[Name],[HiveType],[MaxLen],[IsNullable],[NeedTrim],[RemoveQuotes],[ReplacedText],[ReplacementText],[SpecialFormat],[TreatEmptyAsNull],[IsPrimaryKey],[Order]
    ,[IsCalculated],[IsPartitionColumn],[IsMetadata],[SQLType],[ValidationText],[Description],[SourceName],[ParentSecurityPath],[IsEncrypted],[DataMask],[ItemId])
SELECT @Entity_Id, [Name], [HiveType], NULL, 0, 0, 0, NULL, NULL, NULL, 0, 0, [Order], 0, 0, 0, [SqlType], NULL, NULL, [Name], @Attribute_ParentSecurityPath, 0, NULL, NEWID()
FROM #Entity_Attributes

INSERT INTO [DataIngestion].[Attribute]
    ([IdEntity],[Name],[HiveType],[MaxLen],[IsNullable],[NeedTrim],[RemoveQuotes],[ReplacedText],[ReplacementText],[SpecialFormat],[TreatEmptyAsNull],[IsPrimaryKey],[Order]
    ,[IsCalculated],[IsPartitionColumn],[IsMetadata],[SQLType],[ValidationText],[Description],[SourceName],[ParentSecurityPath],[IsEncrypted],[DataMask],[ItemId])
VALUES
 (@Entity_Id, 'LoadDate',       'STRING',  NULL, 0, 0, 0, NULL, NULL, 'FROM_UNIXTIME(UNIX_TIMESTAMP())', 0, 0, 500, 1, 0, 1, 'datetime2(7)', NULL, NULL, NULL, @Attribute_ParentSecurityPath, 0, NULL, NEWID())
,(@Entity_Id, 'HasErrors',      'BOOLEAN', NULL, 0, 0, 0, NULL, NULL, 'FALSE',                           0, 0, 501, 1, 0, 1, 'bit',          NULL, NULL, NULL, @Attribute_ParentSecurityPath, 0, NULL, NEWID())
,(@Entity_Id, 'SidraIsDeleted', 'BOOLEAN', NULL, 0, 0, 0, NULL, NULL, 'SidraIsDeleted',                  0, 0, 502, 1, 0, 1, 'bit',          NULL, NULL, NULL, @Attribute_ParentSecurityPath, 0, NULL, NEWID())
,(@Entity_Id, 'IdSourceItem',   'INT',     NULL, 0, 0, 0, NULL, NULL, 'IdSourceItem',                    0, 0, 503, 1, 1, 1, 'int',          NULL, NULL, NULL, @Attribute_ParentSecurityPath, 0, NULL, NEWID())

END;

-------------------------------------------------------------------------------
-- Relate with pipeline
-------------------------------------------------------------------------------
BEGIN

INSERT INTO [DataIngestion].[EntityPipeline] ([IdEntity], [IdPipeline])
SELECT [IdEntity], [IdPipeline]
FROM
    (
    SELECT @Entity_Id [IdEntity], @Pipeline_Id [IdPipeline]
    EXCEPT 
    SELECT [IdEntity], [IdPipeline] FROM [DataIngestion].[EntityPipeline]
    ) AS x

END;

-------------------------------------------------------------------------------
-- Check results
-------------------------------------------------------------------------------
SET NOCOUNT OFF;
PRINT('-------------------------------------------------------------------------------')
SELECT * FROM [DataIngestion].[Provider] WHERE [ProviderName] = @Provider_Name
SELECT * FROM [DataIngestion].[Entity] WHERE [Name] = @Entity_Name
SELECT * FROM [DataIngestion].[Attribute] WHERE [IdEntity] = @Entity_Id
SELECT * FROM [DataIngestion].[EntityPipeline] WHERE [IdEntity] = @Entity_Id
SELECT * FROM [DataIngestion].[Asset] a WHERE [IdEntity] = @Entity_Id

COMMIT TRAN CreateEntity

Last update: 2023-08-09