Skip to content

How CSV data intake works

Information

Take into account that this tutorial can be used as an illustrative guide. Due to its nature subject of continuous development, there may be minor changes to the displayed code in the scripts. Use these scripts as starters to build your own. For more information, you can contact our Sidra's Support Team.

In this tutorial we detail how to configure a Sidra Provider to ingest data with CSV files into the platform.

Introduction

The data intake process for file ingestion from the landing zone is described conceptually in Asset flow into platform via landing. For this process to take place, Sidra needs the metadata - definitions for each Entity and the Attributes corresponding to the CSV columns.

As opposed to database Providers, there is no automatic metadata inference pipeline available. This means that one needs to manually specify this metadata, for each of the Entities that should be ingested, as well as their Attributes.

Defining the metadata, as an overview, refers to:

  1. Ensure we have a Provider for the various CSV files to be ingested.
  2. Define an Entity for each type of CSV to be ingested.
  3. For each Entity (type of CSV), define the Attributes corresponding to the columns/fields present in that CSV.
    • Note that there are also some system-required Attributes, needed for specifying how to read and process the columns/fields in Databricks tables.
  4. Associate the Entity with the ingestion Pipeline for data files.

Declaring the metadata can be carried out by either calling the Sidra API, or executing a SQL script directly on the Sidra Core metadata database.

Important Note

It is recommended to declare the metadata for the Entities (CSVs) to be ingested using the Sidra Core API calls. The API is performing additional validations. By contrast, adding the metadata with a SQL script does not ensure input validation.

Defining the metadata

Information

Take into account that Sidra metadata tables do not allow for space characters in the names of Attributes or Entities. From version 2022.R2 1.12.2 update, the metadata API will sanitize spaces in characters if they are provided in Entity or Attribute names, by replacing with `_` character. Note that, for adding metadata, the name sanitation will be only done by the API, the SQL scripts will not do it, so it will be required to avoid spaces in Attributes or Entities names by this method.

There are rules on configuring the metadata. Below there are sample scripts to do so - for PowerShell-based API calls or direct Transact-SQL - but you should be aware of the restrictions, limitations, and settings affecting the ingestion.

Defining the Provider

The Provider should be regarded as the data source, like the database or the system generating the CSV files. The main fields for a Provider definition:

  • The [ProviderName] is the human-friendly identifier, no longer than 30 chars. Keep in mind that this field only takes unique values; can't have 2 Providers having the same name.
  • The [DatabaseName] will identify the target Databricks destination DB and the Azure Storage container for Assets. Hence, observe the naming limitations as per Azure's documentation.
  • The DSU identifier or the Azure DSU Resource Group where data is ingested to Databricks. Most Sidra deployments would use only 1 DSU. The DSU records are stored in the Core DB in [DataIngestion].[DataStorageUnit] table.

Defining the Entity

It helps thinking of the Entity as being a database table; each Entity will have a Databricks table associated. Metadata about the Entity tells how the data is to be ingested in the table. Things to observe:

  • The [TableName] field of the Entity will be used to identify the Databricks table; hence, restrictions apply.
    Most notably, the space ' ' character must not be used. In fact, the Java libraries in Databricks will reject table names containing " ,;{}()\n\t".
  • The [RegularExpression] acts like a filter and mapper. Only CSV files with names matching the RegEx will be associated to this Entity and its Pipeline. CSV files not matching any Entity RegEx will be rejected as noise artifacts.
    Rejections will be logged as errors in the [Log].[Util].[Log] database table.
  • The [StartValidDate] and [EndValidDate] are also filters. CSV blobs that don't fall in the interval will not be ingested.
    NOTE: The Asset date, which is considered for the interval, is deducted from the CSV blob name or path:
    • either its path should contain /<year>/<month>/<date>/ segments; or
    • its file name should indicate the date, and the RegEx matching should be built to extract these (see example below).
  • Mind the [FieldDelimiter]: although CSV denotes Comma-Separated Values, the delimiter may also be TAB \t, pipe |, ampersand &, circumflex ^, or others.
    Place the UTF-8 representation of the delimiter in this field, like \u0009 for TAB, \u002C for comma, or others.

Defining the Attributes

Just like an Entity describes a table, the Attributes are describing the table columns, or the record fields. A few things to observe:

  • Don't use the space ' ' character in the [Name]. The Java libraries in Databricks will reject table names containing " ,;{}()\n\t".
  • The [HiveType] and [SqlType] describe what kind of data the column has, and how it should be treated by Databricks or any SQL DB used in Client Apps.
  • With CSV ingestion, there are some 4 system-required Attributes, used in Data Lake. These are created by the sample scripts below. They should be the last in relation to the [Order] field of the Attributes.
  • One of the columns in the CSV should be a Primary Key, if records are planned to be updated with CSV ingestions for the Entity. Its corresponding Attribute should have [IsPrimaryKey] set to 1.

Pipeline and Stage landing

The defined Entity should be processed by the FileIngestionDatabricks Pipeline; so, a mapping record should exist in the [DataIngestion].[EntityPipeline] table. This pipeline will be responsible for:

  1. Registering a new Asset for each new CSV file to be ingested for the Entity. Once registered, the Asset's CSV blob is moved under a Stage container with the name of the Provider.
  2. Creating (or re-creating) the target Databricks table where records would be ingested, if that table does not exist.
  3. Creating (or re-creating) a transfer query in the Databricks workspace, if it does not exist, for the Provider's Entity.
  4. Running the transfer query to bring data from the registered Asset (CSV blob) into the Databricks Entity's table.

The CSV files should be dropped under the landing container of the DSU Stage Azure Storage Account - the account containing stage in its name. Additionally:

  • These file blobs should be placed under a folder like /<provider-name>/<entity-name>/.
  • The file names should match the [RegularExpression] set for the Entity.
  • The full name of the resulting blob should indicate the Asset date:
    • either its path should contain /<year>/<month>/<date>/ segments; or
    • its file name should indicate the date, and the RegEx matching should be built to extract these (see example below).

If everything is correct, the above Pipeline would pick the landed CSV blob and start processing it, resulting in a Data Factory pipeline run. At the end of the Pipeline run, the records from the CSV file will be visible in the Entity's Databricks table.

Step-by-step

Add new Provider

For more information, check the specific tutorial for adding a new Provider.

Add new Entity

For more information, check the specific tutorial for adding a new Entity.

Add new Attribute

For more information, check the specific tutorial for adding a new Attribute.

Associate an Entity with a pipeline

For more information, check the specific tutorial for associating Entities and pipelines.

Assets metadata

You can check as well the documentation regarding Assets metadata.

Configuration through API calls

The following calls to the Sidra Core API can be performed through Swagger or PowerShell, to declare the metadata as per above instructions:

  • Endpoint /api/metadata/providers: used to create the Provider.
  • Endpoint /api/metadata/entities: used to create the Entities.
  • Endpoint /api/metadata/attributes: used to create the Attributes associated to an Entity.
  • Endpoint /api/metadata/entities/{idEntity}/pipelines/{idPipeline}: used to associate an Entity with a pipeline.

Helper PowerShell script

To help automate defining the metadata, you can use the following PowerShell script template and its associated input file:

  • The Define-CSV-entity.ps1 is calling the Sidra Core API, based on the information passed to it.
  • The Define-CSV-entity.psd1 should be populated with the needed info and then passed as -ParamsFile parameter to the above script.

Because the parameters needed by the script are complex, these parameters may be populated in a PowerShell-specific PSD1 file. The sample above has the needed structure for passing:

  • Information about how to call the Sidra Core API: endpoints and obtaining the OAuth authorization token.
  • Required metadata for Provider, the CSV Entity and the Attributes pertaining to CSV columns.

Configuration through SQL script

Although the metadata should be configured via API calls, this metadata can also be directly added into the Sidra Core metadata DB.

The provided SQL script presents a set of variables at the start that are needed to create the Sidra metadata objects. By default the Entities will be associated with the pipeline FileIngestionDatabricks. Next, we will detail the variables that must be specified.

Pre-requirements

We need to set up the name of the DSU resource group where we will store the information. The DSU records are stored in the Core DB in [DataIngestion].[DataStorageUnit] table.

DECLARE @Dsu_ResourceGroupName VARCHAR(50) = 'Sidra.DSU.Default.Test';

Provider configuration

We need to specify the name, owner and description:

DECLARE @Provider_Name VARCHAR(30) = 'MyFirstCsvIngestion';
DECLARE @Provider_Owner NVARCHAR(256) = N'John.Doe@PlainConcepts.com';
DECLARE @Provider_Description NVARCHAR(MAX) = N'Feed a TAB-separated values CSV file exported from the [Bike-Store] sample SQL Server DB.';

Entity configuration

In the case of the Entity, we will need to specify some additional data. For example, we need to specify the file format, the text encoding, the field delimiter, the number of header lines, and the regular expression used to filter the files deposited in the landing zone with the corresponding Entity.

The regular expression in the example below identifies a file with the following name. You may download the file to view the data:

Bike-Store.Bikes_2022-01-28_16-30-21.csv

This expression needs to match the file name. It also helps determine the [AssetDate], so that Sidra can identify the latest Asset for that Entity, if we're not dropping the file under a /<year>/<month>/<day>/ folder.

The sample SQL script would look similar to this one:

DECLARE @Entity_Name VARCHAR(256) = 'BikeStoreBikes';
DECLARE @Entity_RegularExpression NVARCHAR(500) = '^Bike-Store.Bikes_((?<year>\d{4})-(?<month>\d{2})-(?<day>\d{2}))_((?<hour>\d{2})-(?<minute>\d{2})-(?<second>\d{2})).csv';
DECLARE @Entity_Delimiter VARCHAR(6) = '\u0009'; -- => TAB char '\t'. Others: \u002C=','    \u007C='|'    \u0026='&'   \u005E='^'
DECLARE @Entity_Encoding VARCHAR(50) = 'UTF-8';
DECLARE @Entity_HeaderLines INT = 1;
DECLARE @Entity_Format VARCHAR(10) = 'csv';
DECLARE @Entity_StartValidDate DATETIME = CAST('2020-12-25' AS DATETIME);

Attribute configuration

Each of the columns if this CSV file will map to an Attribute in Sidra. Therefore, we need to indicate each one of the fields (or columns) contained in the CSV file, as well as the type of this field to be used in Databricks and SQL Server.

The sample SQL script would look similar to this one:

DROP TABLE IF EXISTS #Entity_Attributes;
CREATE TABLE #Entity_Attributes ([Order] INT, [Name] VARCHAR(150), [HiveType] VARCHAR(30), [SqlType] VARCHAR(30));
INSERT INTO #Entity_Attributes
         ([Order],  [Name],                 [HiveType],             [SqlType]           )
    VALUES
         (1,        'product_id',           'INT',                  'INT'               )
        ,(2,        'product_name',         'STRING',               'NVARCHAR(128)'     )
        ,(3,        'brand_name',           'STRING',               'NVARCHAR(32)'      )
        ,(4,        'category_name',        'STRING',               'NVARCHAR(64)'      )
        ,(5,        'model_year',           'INT',                  'SMALLINT'          )
        ,(6,        'list_price',           'DECIMAL(10, 2)',       'DECIMAL(10, 2)'    )
;

This example would correspond to a CSV file with the below structure:

product_id product_name brand_name category_name model_year list_price
1 Trek 820 - 2016 Trek Mountain Bikes 2016 379.99
2 Ritchey Timberwolf Frameset - 2016 Ritchey Mountain Bikes 2016 749.99
3 Surly Wednesday Frameset - 2016 Surly Mountain Bikes 2016 999.99
4 Trek Fuel EX 8 29 - 2016 Trek Mountain Bikes 2016 2899.99

New Filter UI

Complete script

We will need to modify the variables as already explained in the above steps. The script is divided in different sections, where the different above actions are outlined separately.

If the Provider or the Entity to be inserted already exist, the script would ignore them and will continue to the next section.

For the Attributes: the script will remove all the already-existing records for the Entity, before adding the configured ones, to prevent duplicate Attribute entries.

The Entity-Pipeline association will be inserted if it does not exist yet in the [EntityPipeline] table. The pipeline to be associated will be FileIngestionDatabricks.

DECLARE @Dsu_ResourceGroupName VARCHAR(50) = 'Sidra.DSU.Default.Test';

DECLARE @Provider_Name VARCHAR(30) = 'MyFirstCsvIngestion';
DECLARE @Provider_Owner NVARCHAR(256) = N'John.Doe@PlainConcepts.com';
DECLARE @Provider_Description NVARCHAR(MAX) = N'Feed a TAB-separated values CSV file exported from the [Bike-Store] sample SQL Server DB.';

DECLARE @Entity_Name VARCHAR(256) = 'BikeStoreBikes';
--  File name being fed: Bike-Store.Bikes_2022-01-28_16-30-21.csv
DECLARE @Entity_RegularExpression NVARCHAR(500) = '^Bike-Store.Bikes_((?<year>\d{4})-(?<month>\d{2})-(?<day>\d{2}))_((?<hour>\d{2})-(?<minute>\d{2})-(?<second>\d{2})).csv';
DECLARE @Entity_Delimiter VARCHAR(6) = '\u0009'; -- => TAB char '\t'. Others: \u002C=','    \u007C='|'    \u0026='&'   \u005E='^'
DECLARE @Entity_Encoding VARCHAR(50) = 'UTF-8';
DECLARE @Entity_HeaderLines INT = 1;
DECLARE @Entity_Format VARCHAR(10) = 'csv';
DECLARE @Entity_StartValidDate DATETIME = CAST('2020-12-25' AS DATETIME);

DROP TABLE IF EXISTS #Entity_Attributes;
CREATE TABLE #Entity_Attributes
    (
            [Order] INT,
            [Name] VARCHAR(150),
            [HiveType] VARCHAR(30),
            [SqlType] VARCHAR(30)
    );
INSERT INTO #Entity_Attributes
         ([Order],      [Name],                 [HiveType],             [SqlType]           )
    VALUES
         (1,            'product_id',           'INT',                  'INT'               )
        ,(2,            'product_name',         'STRING',               'NVARCHAR(128)'     )
        ,(3,            'brand_name',           'STRING',               'NVARCHAR(32)'      )
        ,(4,            'category_name',        'STRING',               'NVARCHAR(64)'      )
        ,(5,            'model_year',           'INT',                  'SMALLINT'          )
        ,(6,            'list_price',           'DECIMAL(10, 2)',       'DECIMAL(10, 2)'    )
;
-------------------------------------------------------------------------------

SET NOCOUNT ON;
BEGIN TRAN CreateEntity;

DECLARE @CurrentDate DATETIME2 = GETUTCDATE();

DECLARE @Pipeline_ItemId UNIQUEIDENTIFIER = (SELECT TOP(1) [ItemId] FROM [DataIngestion].[Pipeline] WHERE [Name] = 'FileIngestionDatabricks');
DECLARE @Pipeline_Id INT = (SELECT TOP (1) p.[Id] FROM [DataIngestion].[Pipeline] p WHERE p.[ItemId] = @Pipeline_ItemId);

DECLARE @Entity_Properties VARCHAR(MAX) = CONCAT('{"sourceTableName": "[staging].[', @Entity_Name, ']"}');
DECLARE @Provider_DatabaseName VARCHAR(MAX) = LOWER(@Provider_Name);

DECLARE @Dsu_Id INT;
DECLARE @Dsu_SecurityPath VARCHAR(10);

SELECT 
        @Dsu_Id = [Id] 
        ,@Dsu_SecurityPath = [SecurityPath]
    FROM [DataIngestion].[DataStorageUnit] 
    WHERE [ResourceGroupName] = @Dsu_ResourceGroupName;

-------------------------------------------------------------------------------
-- Create Provider
-------------------------------------------------------------------------------
BEGIN

IF NOT EXISTS (SELECT * FROM [DataIngestion].[Provider] WHERE [ProviderName] = @Provider_Name)
BEGIN
    INSERT INTO [DataIngestion].[Provider]
        (
            [ItemId], 
            [ProviderName], 
            [DatabaseName], 
            [Description], 
            [Owner], 
            [IdDataStorageUnit], 
            [ParentSecurityPath], 
            [CreationDate])
    VALUES
        (
            NEWID(),                    -- [ItemId]
            @Provider_Name,             -- [ProviderName]
            @Provider_DatabaseName,     -- [DatabaseName]
            @Provider_Description,      -- [Description]
            @Provider_Owner,            -- [Owner]
            @Dsu_Id,                    -- [IdDataStorageUnit]
            @Dsu_SecurityPath,          -- [ParentSecurityPath]
            @CurrentDate                -- [CreationDate]
        )
END

DECLARE @Provider_Id INT = (SELECT TOP 1 [Id] FROM [DataIngestion].[Provider] WHERE [ProviderName] = @Provider_Name)

END;

-------------------------------------------------------------------------------
--  Create Entity
-------------------------------------------------------------------------------
BEGIN

DECLARE @Entity_ParentSecurityPath VARCHAR(100) = CONCAT(@Dsu_Id, '/', @Provider_Id)

IF NOT EXISTS (SELECT * FROM [DataIngestion].[Entity] WHERE [Name] = @Entity_Name)
BEGIN
    INSERT INTO [DataIngestion].[Entity]
        (
            [IdProvider],
            [Name],
            [TableName],
            [RegularExpression],
            [StartValidDate], 
            [EndValidDate],
            [Serde],
            [SerdeProperties],
            [Encoding],
            [HeaderLines],
            [FieldDelimiter],
            [LastUpdated],
            [LastDeployed],
            [Format],
            [NullText],
            [ReCreateTableOnDeployment],
            [RowDelimiter],
            [FilesPerDrop],
            [SourcePath],
            [Description],
            [AdditionalProperties],
            [IdTableFormat],
            [GenerateDeltaTable],
            [ParentSecurityPath],
            [CreationDate],
            [ItemId]
        ) 
    VALUES 
        (
            @Provider_Id,                  --  [IdProvider],
            @Entity_Name,                  --  [Name],
            @Entity_Name,                  --  [TableName],
            @Entity_RegularExpression,     --  [RegularExpression],
            @Entity_StartValidDate,        --  [StartValidDate], 
            NULL,                          --  [EndValidDate],
            NULL,                          --  [Serde],
            NULL,                          --  [SerdeProperties],
            @Entity_Encoding,              --  [Encoding],
            @Entity_HeaderLines,           --  [HeaderLines],
            @Entity_Delimiter,             --  [FieldDelimiter],
            @CurrentDate,                  --  [LastUpdated],
            NULL,                          --  [LastDeployed],
            @Entity_Format,                --  [Format],
            NULL,                          --  [NullText],
            1,                             --  [ReCreateTableOnDeployment],
            NULL,                          --  [RowDelimiter],
            1,                             --  [FilesPerDrop],
            NULL,                          --  [SourcePath],
            NULL,                          --  [Description],
            @Entity_Properties,            --  [AdditionalProperties],
            3,                             --  [IdTableFormat],
            0,                             --  [GenerateDeltaTable],
            @Entity_ParentSecurityPath,    --  [ParentSecurityPath],
            @CurrentDate,                  --  [CreationDate],
            NEWID()                        --  [ItemId]
        );
END

DECLARE @Entity_Id INT = (SELECT TOP 1 Id FROM [DataIngestion].[Entity] WHERE [Name] = @Entity_Name)

END;

-------------------------------------------------------------------------------
--  Create Attributes
-------------------------------------------------------------------------------
BEGIN

DECLARE @Attribute_ParentSecurityPath VARCHAR(100) = CONCAT(@Dsu_Id,'/',@Provider_Id,'/',@Entity_Id)

DELETE FROM [DataIngestion].[Attribute] WHERE [IdEntity] = @Entity_Id

INSERT INTO [DataIngestion].[Attribute]
    ([IdEntity],[Name],HiveType,MaxLen,IsNullable,NeedTrim,RemoveQuotes,ReplacedText,ReplacementText,SpecialFormat,TreatEmptyAsNull,IsPrimaryKey,[Order]
    ,IsCalculated,IsPartitionColumn,IsMetadata,SQLType,ValidationText,[Description],SourceName,ParentSecurityPath,IsEncrypted,DataMask,ItemId)
SELECT @Entity_Id, [Name], [HiveType], NULL, 0, 0, 0, NULL, NULL, NULL, 0, 0, [Order], 0, 0, 0, [SqlType], NULL, NULL, [Name], @Attribute_ParentSecurityPath, 0, NULL, NEWID()
FROM #Entity_Attributes

INSERT INTO [DataIngestion].[Attribute]
    ([IdEntity],[Name],HiveType,MaxLen,IsNullable,NeedTrim,RemoveQuotes,ReplacedText,ReplacementText,SpecialFormat,TreatEmptyAsNull,IsPrimaryKey,[Order]
    ,IsCalculated,IsPartitionColumn,IsMetadata,SQLType,ValidationText,[Description],SourceName,ParentSecurityPath,IsEncrypted,DataMask,ItemId)
VALUES
 (@Entity_Id, 'LoadDate',       'STRING',  NULL, 0, 0, 0, NULL, NULL, 'FROM_UNIXTIME(UNIX_TIMESTAMP())', 0, 0, 500, 1, 0, 1, 'datetime2(7)', NULL, NULL, NULL, @Attribute_ParentSecurityPath, 0, NULL, NEWID())
,(@Entity_Id, 'HasErrors',      'BOOLEAN', NULL, 0, 0, 0, NULL, NULL, 'FALSE',                           0, 0, 501, 1, 0, 1, 'bit',          NULL, NULL, NULL, @Attribute_ParentSecurityPath, 0, NULL, NEWID())
,(@Entity_Id, 'SidraIsDeleted', 'BOOLEAN', NULL, 0, 0, 0, NULL, NULL, 'SidraIsDeleted',                  0, 0, 502, 1, 0, 1, 'bit',          NULL, NULL, NULL, @Attribute_ParentSecurityPath, 0, NULL, NEWID())
,(@Entity_Id, 'IdSourceItem',   'INT',     NULL, 0, 0, 0, NULL, NULL, 'IdSourceItem',                    0, 0, 503, 1, 1, 1, 'int',          NULL, NULL, NULL, @Attribute_ParentSecurityPath, 0, NULL, NEWID())

END;

-------------------------------------------------------------------------------
--  Relate Entity with Pipeline
-------------------------------------------------------------------------------
BEGIN

INSERT INTO [DataIngestion].[EntityPipeline] ([IdEntity], [IdPipeline])
    SELECT [IdEntity], [IdPipeline]
    FROM (
        SELECT @Entity_Id [IdEntity], @Pipeline_Id [IdPipeline]
        EXCEPT 
        SELECT [IdEntity], [IdPipeline] FROM [DataIngestion].[EntityPipeline]
    ) AS x

END;

-------------------------------------------------------------------------------
-- Check results
-------------------------------------------------------------------------------
SET NOCOUNT OFF;
PRINT('-------------------------------------------------------------------------------');
PRINT('-------------------------------------------------------------------------------');
SELECT * FROM [DataIngestion].[Provider]        WHERE [ProviderName]    = @Provider_Name;
SELECT * FROM [DataIngestion].[Entity]          WHERE [Name]            = @Entity_Name;
SELECT * FROM [DataIngestion].[Attribute]       WHERE [IdEntity]        = @Entity_Id;
SELECT * FROM [DataIngestion].[EntityPipeline]  WHERE [IdEntity]        = @Entity_Id;
SELECT * FROM [DataIngestion].[Asset]           WHERE [IdEntity]        = @Entity_Id;

COMMIT TRAN CreateEntity;

Info

Do not forget to populate the parameters according to your scenario and metadata names. You can download the complete script here.

Sample load from a CSV file

Once the file metadata has been configured, we can execute a test load. It will be required to perform the following steps:

1. Rename the CSV file

  • The name needs to follow the regular expression mentioned in step Entity configuration.
  • The name of the file also needs to specify day and date.
    In our example: Bike-Store.Bikes_2022-01-28_16-30-21.

2. Deposit the file in the landing zone

  • The landing zone is in the DSU Resource Group, in the Storage Account with the name that contains stage.
  • Inside this account there is a container with the name landing...

CSV tutorial landing container

  • It is highly recommended to upload the file to a path that contains the Provider and the Entity of the file, like:
    /<name_provider>/<name_entity>/<file>.csv
    /<name_provider>/<name_entity>/<year>/<month>/<day>/<file>.csv

If the metadata and file configuration have been the correct ones:

  • Sidra will start automatically to load the file in the DSU.
  • In Databricks, there will be a table with the name of the Provider and Entity.
  • This table will contain the CSV information in delta format.
  • The data will be stored in Databricks in an incremental way: each new CSV file that corresponds to this Entity will append new records to the corresponding table in Databricks.

Example

Here we can see an example of what could be visualized in Databricks after loading two CSV files corresponding to the configured same Entity:

  • Provider: MyFirstCsvIngestion.
  • Entity: BikeStoreBikes.
  • The sample CSV is depicted above; you may download for a test.
  • Databricks table:

CSV tutorial landing container

Ingestion with record sets merges

The above instructions are assuming that records are simply being added (appended) to existing ones in the Datalake table(s).

What about merging? What if records from a new feed - from a new CSV/Excel file being - are supposed to update the existing records that have been already fed?

Such ingestion case would make use of primary keys:

  • The records being ingested from the CSV files would have a field (or more) declared as Primary Key.
  • The fields of the records identified with the Primary Key would be updated, if the record already exists in the target DSU (Data Storage Unit) table.

There are two requirements that need to be satisfied in order to update the records - merge the new incoming records with the ones that have been already ingested. Without these requirements, the new incoming records will simply be appended to the existing ones, which may lead to data duplication or inconsistencies. These requirements are:

  1. The Entity must be set with { "consolidationMode": "Merge" } attribute.
  2. Among the Attributes of the Entity, one of the fields must be declared as Primary Key.

The "consolidationMode": "Merge" attribute will be an additional property of the Entity:

  • This Attribute needs to placed inside the [AdditionalProperties] field of the Entity record.
  • The [AdditionalProperties] field expects a JSON object defining extensibility options for the Entity.
  • Other Attributes may also be present in the JSON populating the [AdditionalProperties] field of the Entity record.
  • Entity records are in Sidra's Core DB, in the [DataIngestion].[Entity] table. Remember that "Entity" in this context should be seen as a data table that will hold the records in the CSV/Excel files. An Entity would be a data table definition, Attributes being its fields, or columns.

In the example SQL script defining a CSV ingestion with records update, note that the Entity's [AdditionalProperties] field will contain "consolidationMode": "Merge"; look for line:

DECLARE @Entity_Properties VARCHAR(MAX) = CONCAT('{"sourceTableName": "[staging].[', @Entity_Name, ']", "consolidationMode": "Merge"}');

Also, in the same file, notice that the first field, product_id is added as Primary Key Attribute: [IsPrimaryKey] = 1.

INSERT INTO #Entity_Attributes
     ([Order],      [Name],                 [HiveType],             [SqlType],              [IsPrimaryKey]  )
VALUES
     (1,            'product_id',           'INT',                  'INT',                  1               )
    ,(2,            'product_name',         'STRING',               'NVARCHAR(128)',        0               )
    ,(3,            'brand_name',           'STRING',               'NVARCHAR(32)',         0               )
    ,(4,            'category_name',        'STRING',               'NVARCHAR(64)',         0               )
    ,(5,            'model_year',           'INT',                  'SMALLINT',             0               )
    ,(6,            'list_price',           'DECIMAL(10, 2)',       'DECIMAL(10, 2)',       0               )

For experimentation purposes, you may download, customize, then use the following files:


Sidra Ideas Portal


Last update: 2022-07-14
Back to top