Skip to content

How CSV data intake works

In this tutorial we detail how to configure a Sidra Provider to ingest data with CSV files into the platform.

Introduction

The data intake process for file ingestion from the landing zone is described conceptually in Asset flow into platform via landing. For this process to take place, Sidra needs the metadata - definitions for each Entity and the Attributes corresponding to the CSV columns.

As opposed to database Providers, there is no automatic metadata inference pipeline available. This means that one needs to manually specify this metadata, for each of the Entities that should be ingested, as well as their Attributes.

Defining the metadata, as an overview, refers to:

  • Ensure we have a Provider for the various CSV files to be ingested.
  • Define an Entity for each type of CSV to be ingested.
  • For each Entity (type of CSV), define the Attributes corresponding to the columns/fields present in that CSV. Note that there are also some system-required Attributes, needed for specifying how to read and process the columns/fields in Databricks tables.
  • Associate the Entity with the ingestion Pipeline for data files.

Info

The Data Intake Process for file ingestion from the landing zone is described conceptually in the section Data intake via landing zone. For more information, you can see the following detailed pages:

Declaring the metadata can be carried out by using Sidra API or an SQL script directly on the Sidra Core metadata database.

Important Note

It is recommended to declare the metadata for the Entities (CSVs) to be ingested using the Sidra Core API calls. The reason for this is that the API is performing additional validations. By contrast, adding the metadata with a SQL script does not ensure input validation.

Configuration for data intake from CSV

There are several ways to configure this kind of intake of data:

  • Configuration through the API (recommended).
  • Configuration through SQL script.

1. Configuration through API calls

The following calls to the API can be performed through Swagger or PowerShell:

  • Endpoint /api/metadata/providers: used to create the Provider.
  • Endpoint /api/metadata/entities: used to create the Entities.
  • Endpoint /api/metadata/attributes: used to create the Attributes associated to an Entity.
  • Endpoint /api/metadata/entities/{idEntity}/pipelines/{idPipeline}: used to associate an Entity with a pipeline.

2. Configuration through SQL script

Although the metadata should be configured via API calls, this metadata can also be directly added into the Sidra Core metadata DB.

The provided SQL script presents a set of variables at the start that are needed to create the Sidra metadata objects. By default the Entities will be associated with the pipeline FileIngestionDatabricks. Next, we will detail the variables that are needed to be specified.

Step 1. Pre-requirements

We need to set up the name of the DSU resource group where we will store the information:

DECLARE @Dsu_ResourceGroupName VARCHAR(50) = 'Sidra.DSU.Default.Dev'

Step 2. Provider configuration

We need to specify the name, owner and description:

DECLARE @Provider_Name VARCHAR(30) = 'MyFirstCsvIngestion'
DECLARE @Provider_Owner NVARCHAR(256) = N'John.Doe@PlainConcepts.com'
DECLARE @Provider_Description NVARCHAR(MAX) = N'Attempt to feed a TAB-separated values file from the Bike-Stores sample DB'

Step 3. Entity configuration

In the case of the Entity, we will need to specify some additional data.

For example, we need to specify the file format (in this case it will be CSV), the encoding, the field delimiter (e.g., using a comma character), the number of header lines and the regular expression that is used to link the files deposited in the landing zone with the corresponding Entity.

The regular expression described in the example below identifies a file with the following name. You may download the file to view the data:

Bikes-Stores.Bikes_2022-01-28_16-30-21.csv

This expression needs to always contain the file of the document, so that Sidra can identify the latest Assets for that Entity.

The sample SQL script would look similar to this one:

DECLARE @Entity_Name VARCHAR(256) = 'BikeStoreBikes'
DECLARE @Entity_RegularExpression NVARCHAR(500) = '^Bikes-Stores.Bikes_((?<year>\d{4})-(?<month>\d{2})-(?<day>\d{2}))_((?<hour>\d{2})-(?<minute>\d{2})-(?<second>\d{2})).csv'
DECLARE @Entity_Delimiter VARCHAR(6) = '\u0009' -- => TAB char, '\t' --Others: '\u002C'=','
DECLARE @Entity_Encoding VARCHAR(50) = 'UTF-8'
DECLARE @Entity_HeaderLines INT = 1
DECLARE @Entity_Format VARCHAR(10) = 'csv'

Step 4. Attribute configuration

Each of the columns if this CSV file will map to an Attribute in Sidra. Therefore, we need to indicate each one of the fields (or columns) contained in the CSV file, as well as the type of this field to be used in Databricks and SQL Server.

The sample SQL script would look similar to this one:

DROP TABLE IF EXISTS #Entity_Attributes
CREATE TABLE #Entity_Attributes ([Order] INT, [Name] VARCHAR(150), [HiveType] VARCHAR(30), [SqlType] VARCHAR(30))
INSERT INTO #Entity_Attributes ([Order], [Name], [HiveType], [SqlType])
VALUES
     (1,    'product_id',           'INT',          'INT')
    ,(2,    'product_name',         'STRING',       'NVARCHAR(128)')
    ,(3,    'brand_name',           'STRING',       'NVARCHAR(32)')
    ,(4,    'category_name',        'STRING',       'NVARCHAR(64)')
    ,(5,    'model_year',           'INT',          'SMALLINT')
    ,(6,    'list_price',           'DECIMAL',      'DECIMAL(10, 2)')

This example would correspond to a CSV file with the below structure:

product_id product_name brand_name category_name model_year list_price
1 Trek 820 - 2016 Trek Mountain Bikes 2016 379.99
2 Ritchey Timberwolf Frameset - 2016 Ritchey Mountain Bikes 2016 749.99
3 Surly Wednesday Frameset - 2016 Surly Mountain Bikes 2016 999.99
4 Trek Fuel EX 8 29 - 2016 Trek Mountain Bikes 2016 2899.99

Complete script

We will need to modify the variables as already explained in the above steps. The script is divided in different sections, where the different above actions are outlined separately.

If the Provider or the Entity to be inserted already existed, the script would ignore them and will continue to the next section.

For the Attributes, the script will remove always all the Attributes that are related to the specified Entity, and will insert them again.

The Entity-Pipeline association will be inserted if it does not exist yet in the EntityPipeline table. The pipeline to be associated will be FileIngestionDatabricks.

DECLARE @Dsu_ResourceGroupName VARCHAR(50) = 'Sidra.DSU.Default.Dev'

DECLARE @Entity_Name VARCHAR(MAX) = 'BikeStoreBikes'
DECLARE @Entity_RegularExpression NVARCHAR(MAX) = '^Bikes-Stores.Bikes_((?<year>\d{4})-(?<month>\d{2})-(?<day>\d{2}))_((?<hour>\d{2})-(?<minute>\d{2})-(?<second>\d{2})).csv'
DECLARE @Entity_Delimiter VARCHAR(6) = '\u0009' -- => TAB char, '\t' --Others: '\u002C'=','
DECLARE @Entity_Encoding VARCHAR(50) = 'UTF-8'
DECLARE @Entity_HeaderLines INT = 1
DECLARE @Entity_Format VARCHAR(10) = 'csv'

DROP TABLE IF EXISTS #Entity_Attributes
CREATE TABLE #Entity_Attributes ([Order] INT, [Name] VARCHAR(150), [HiveType] VARCHAR(30), [SqlType] VARCHAR(30))
INSERT INTO #Entity_Attributes ([Order], [Name], [HiveType], [SqlType])
VALUES
     (1,    'product_id',           'INT',          'INT')
    ,(2,    'product_name',         'STRING',       'NVARCHAR(128)')
    ,(3,    'brand_name',           'STRING',       'NVARCHAR(32)')
    ,(4,    'category_name',        'STRING',       'NVARCHAR(64)')
    ,(5,    'model_year',           'INT',          'SMALLINT')
    ,(6,    'list_price',           'DECIMAL',      'DECIMAL(10, 2)')

-------------------------------------------------------------------------------

SET NOCOUNT ON;
BEGIN TRAN CreateEntity

DECLARE @CurrentDate DATETIME2 = GETUTCDATE()

DECLARE @Pipeline_ItemId UNIQUEIDENTIFIER = 
    (SELECT TOP(1) P.[ItemId] FROM [DataIngestion].[Pipeline] P WHERE [Name] = 'FileIngestionDatabricks')
    -- This should be: F8CD4AD9-DCAE-4463-9185-58292D14BE99
DECLARE @Pipeline_Id INT = (SELECT TOP (1) P.[Id] FROM [DataIngestion].[Pipeline] P WHERE P.[ItemId] = @Pipeline_ItemId)

DECLARE @Entity_Properties VARCHAR(MAX) = CONCAT('{"sourceTableName": "[staging].[', @Entity_Name, ']"}')
DECLARE @Provider_DatabaseName VARCHAR(MAX) = LOWER(@Provider_Name)

DECLARE @Dsu_Id INT
DECLARE @Dsu_SecurityPath VARCHAR(10)

SELECT 
    @Dsu_Id = [Id] 
    ,@Dsu_SecurityPath = [SecurityPath]
FROM [DataIngestion].[DataStorageUnit] 
WHERE [ResourceGroupName] = @Dsu_ResourceGroupName

-------------------------------------------------------------------------------
-- Create provider
-------------------------------------------------------------------------------
BEGIN

IF NOT EXISTS (SELECT * FROM [DataIngestion].[Provider] WHERE [ProviderName] = @Provider_Name)
BEGIN
    INSERT INTO [DataIngestion].[Provider]
        ([ItemId], [ProviderName], [DatabaseName], [Description], [Owner], [IdDataStorageUnit], [ParentSecurityPath], [CreationDate])
    VALUES
        (NEWID(), @Provider_Name, @Provider_DatabaseName, @Provider_Description, @Provider_Owner, @Dsu_Id, @Dsu_SecurityPath, @CurrentDate)
END

DECLARE @Provider_Id INT = (SELECT TOP 1 [Id] FROM [DataIngestion].[Provider] WHERE [ProviderName] = @Provider_Name)

END;

-------------------------------------------------------------------------------
-- Create entity
-------------------------------------------------------------------------------
BEGIN

DECLARE @Entity_ParentSecurityPath VARCHAR(100) = CONCAT(@Dsu_Id, '/', @Provider_Id)

IF NOT EXISTS (SELECT * FROM [DataIngestion].[Entity] WHERE [Name] = @Entity_Name)
BEGIN
    INSERT INTO [DataIngestion].[Entity]
        ([IdProvider], [Name], [TableName], [RegularExpression],
        [StartValidDate], [EndValidDate], [Serde], [SerdeProperties], [Encoding], [HeaderLines], [FieldDelimiter],
        [LastUpdated], [LastDeployed], [Format], [NullText], [ReCreateTableOnDeployment], [RowDelimiter], [FilesPerDrop],
        [SourcePath], [Description], [AdditionalProperties], [IdTableFormat], [GenerateDeltaTable], [ParentSecurityPath], [CreationDate], [ItemId]) 
    VALUES 
        (@Provider_Id, @Entity_Name, @Entity_Name, @Entity_RegularExpression,
        DATEADD(DAY, -1, @CurrentDate), NULL, NULL, NULL, @Entity_Encoding, @Entity_HeaderLines, @Entity_Delimiter,
        @CurrentDate, NULL, @Entity_Format, NULL, 1, NULL, 1,
        NULL, NULL, @Entity_Properties, 3, 0, @Entity_ParentSecurityPath, @CurrentDate, NEWID())
END

DECLARE @Entity_Id INT = (SELECT TOP 1 Id FROM [DataIngestion].[Entity] WHERE [Name] = @Entity_Name)

END;

-------------------------------------------------------------------------------
-- Create Attribute
-------------------------------------------------------------------------------
BEGIN

DECLARE @Attribute_ParentSecurityPath VARCHAR(100) = CONCAT(@Dsu_Id,'/',@Provider_Id,'/',@Entity_Id)

DELETE FROM [DataIngestion].[Attribute] WHERE [IdEntity] = @Entity_Id

INSERT INTO [DataIngestion].[Attribute]
    ([IdEntity],[Name],[HiveType],[MaxLen],[IsNullable],[NeedTrim],[RemoveQuotes],[ReplacedText],[ReplacementText],[SpecialFormat],[TreatEmptyAsNull],[IsPrimaryKey],[Order]
    ,[IsCalculated],[IsPartitionColumn],[IsMetadata],[SQLType],[ValidationText],[Description],[SourceName],[ParentSecurityPath],[IsEncrypted],[DataMask],[ItemId])
SELECT @Entity_Id, [Name], [HiveType], NULL, 0, 0, 0, NULL, NULL, NULL, 0, 0, [Order], 0, 0, 0, [SqlType], NULL, NULL, [Name], @Attribute_ParentSecurityPath, 0, NULL, NEWID()
FROM #Entity_Attributes

INSERT INTO [DataIngestion].[Attribute]
    ([IdEntity],[Name],[HiveType],[MaxLen],[IsNullable],[NeedTrim],[RemoveQuotes],[ReplacedText],[ReplacementText],[SpecialFormat],[TreatEmptyAsNull],[IsPrimaryKey],[Order]
    ,[IsCalculated],[IsPartitionColumn],[IsMetadata],[SQLType],[ValidationText],[Description],[SourceName],[ParentSecurityPath],[IsEncrypted],[DataMask],[ItemId])
VALUES
 (@Entity_Id, 'LoadDate',       'STRING',  NULL, 0, 0, 0, NULL, NULL, 'FROM_UNIXTIME(UNIX_TIMESTAMP())', 0, 0, 500, 1, 0, 1, 'datetime2(7)', NULL, NULL, NULL, @Attribute_ParentSecurityPath, 0, NULL, NEWID())
,(@Entity_Id, 'HasErrors',      'BOOLEAN', NULL, 0, 0, 0, NULL, NULL, 'FALSE',                           0, 0, 501, 1, 0, 1, 'bit',          NULL, NULL, NULL, @Attribute_ParentSecurityPath, 0, NULL, NEWID())
,(@Entity_Id, 'SidraIsDeleted', 'BOOLEAN', NULL, 0, 0, 0, NULL, NULL, 'SidraIsDeleted',                  0, 0, 502, 1, 0, 1, 'bit',          NULL, NULL, NULL, @Attribute_ParentSecurityPath, 0, NULL, NEWID())
,(@Entity_Id, 'IdSourceItem',   'INT',     NULL, 0, 0, 0, NULL, NULL, 'IdSourceItem',                    0, 0, 503, 1, 1, 1, 'int',          NULL, NULL, NULL, @Attribute_ParentSecurityPath, 0, NULL, NEWID())

END;

-------------------------------------------------------------------------------
-- Relate with pipeline
-------------------------------------------------------------------------------
BEGIN

INSERT INTO [DataIngestion].[EntityPipeline] ([IdEntity], [IdPipeline])
SELECT [IdEntity], [IdPipeline]
FROM
    (
    SELECT @Entity_Id [IdEntity], @Pipeline_Id [IdPipeline]
    EXCEPT 
    SELECT [IdEntity], [IdPipeline] FROM [DataIngestion].[EntityPipeline]
    ) AS x

END;

-------------------------------------------------------------------------------
-- Check results
-------------------------------------------------------------------------------
SET NOCOUNT OFF;
PRINT('-------------------------------------------------------------------------------')
SELECT * FROM [DataIngestion].[Provider] WHERE [ProviderName] = @Provider_Name
SELECT * FROM [DataIngestion].[Entity] WHERE [Name] = @Entity_Name
SELECT * FROM [DataIngestion].[Attribute] WHERE [IdEntity] = @Entity_Id
SELECT * FROM [DataIngestion].[EntityPipeline] WHERE [IdEntity] = @Entity_Id
SELECT * FROM [DataIngestion].[Asset] a WHERE [IdEntity] = @Entity_Id

COMMIT TRAN CreateEntity

Info

Do not forget to populate the parameters according to your scenario and metadata names. You can download the complete script here.

Sample load from a CSV file

Once the file metadata has been configured, we can execute a test load. It will be required to perform the following steps:

1. Rename the CSV file

  • The name needs to follow the regular expression mentioned in step Entity configuration.
  • The name of the file also needs to specify day and date. In our example: Bikes-Stores.Bikes_2022-01-28_16-30-21.

2. Deposit the file in the landing zone

  • The landing zone is in the DSU Resource Group, in the Storage Account with the name that contains stage.
  • Inside this account there is a container with the name landing.

CSV tutorial landing container

  • It is highly recommended to upload the file to a path that contains the Provider and the Entity of the file, like:
    /<name_provider>/<name_entity>/<file>.csv
    /<name_provider>/<name_entity>/<year>/<month>/<day>/<file>.csv

  • If the metadata and file configuration have been the correct ones:

    • Sidra will start automatically to load the file in the DSU.
    • In Databricks, there will be a table with the name of the Provider and Entity.
    • This table will contain the CSV information in delta format.
    • The data will be stored in Databricks in an incremental way: each new CSV file that corresponds to this Entity will append new records to the corresponding table in Databricks.

Example

Here we can see an example of what could be visualized in Databricks after loading two CSV files corresponding to the configured same Entity:

  • Provider: MyFirstCsvIngestion.
  • Entity: BikeStoreBikes.
  • The sample CSV is depicted above; you may download for a test.
  • Databricks table:

CSV tutorial landing container

Ingestion with record sets merges

The above instructions are assuming that records are simply being added (appended) to existing ones in the Datalake table(s).

What about merging? What if records from a new feed - from a new CSV/Excel file being - are supposed to update the existing records that have been already fed?

Such ingestion case would make use of primary keys:

  • The records being ingested from the CSV files would have a field (or more) declared as Primary Key.
  • The fields of the records identified with the Primary Key would be updated, if the record already exists in the target DSU (Data Storage Unit) table.

There are two requirements that need to be satisfied in order to update the records - merge the new incoming records with the ones that have been already ingested. Without these requirements, the new incoming records will simply be appended to the existing ones, which may lead to data duplication or inconsistencies. These requirements are:

  1. The Entity must be set with { "consolidationMode": "Merge" } attribute.
  2. Among the Attributes of the Entity, one of the fields must be declared as Primary Key.

The "consolidationMode": "Merge" attribute will be an additional property of the Entity:

  • This Attribute needs to placed inside the [AdditionalProperties] field of the Entity record.
  • The [AdditionalProperties] field expects a JSON object defining extensibility options for the Entity.
  • Other Attributes may also be present in the JSON populating the [AdditionalProperties] field of the Entity record.
  • Entity records are in Sidra's Core DB, in the [DataIngestion].[Entity] table. Remember that "Entity" in this context should be seen as a data table that will hold the records in the CSV/Excel files. An Entity would be a data table definition, Attributes being its fields, or columns.

In the example SQL script defining a CSV ingestion with records update, note that the Entity's [AdditionalProperties] field will contain "consolidationMode": "Merge"; look for line:

DECLARE @Entity_Properties VARCHAR(MAX) = CONCAT('{"sourceTableName": "[staging].[', @Entity_Name, ']", "consolidationMode": "Merge"}')

Also, in the same file, notice that the first field, product_id is added as Primary Key Attribute: [IsPrimaryKey] = 1.

INSERT INTO #Entity_Attributes
     ([Order],      [Name],                 [HiveType],         [SqlType],              [IsPrimaryKey]  )
VALUES
     (1,            'product_id',           'INT',              'INT',                  1               )
    ,(2,            'product_name',         'STRING',           'NVARCHAR(128)',        0               )
    ,(3,            'brand_name',           'STRING',           'NVARCHAR(32)',         0               )
    ,(4,            'category_name',        'STRING',           'NVARCHAR(64)',         0               )
    ,(5,            'model_year',           'INT',              'SMALLINT',             0               )
    ,(6,            'list_price',           'DECIMAL',          'DECIMAL(10, 2)',       0               )

For experimentation purposes, you may download, customize, then use the following files:


Last update: 2022-05-17
Back to top