Pipeline: fileIngestion-databricks

The fileIngestion-databricks pipeline implements the step 5 of the How an asset flows into the platform. It realizes the file ingestion of the raw copy of the file into the Data Lake based on source Metadata (Entity, Attribute...).

Definition

The pipeline uses the AutoGeneratedImportFile pipeline template:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
{
    "name": "##name##",
    "description": "Generated pipeline for ##name##",
    "properties": {
        "parameters": {
            "fileDate": {
                "type": "DateTime"
            },
            "fileReferences": {
                "type": "String"
            },
            "input": {
                "type": "String"
            },
            "fileId": {
                "type": "Int"
            },
            "tableName": {
                "type": "String"
            },
            "providerName": {
                "type": "String"
            },
            "providerId": {
                "type": "Int"
            }
        },
        "activities": [##Activities##]
    }
}

It is associated with these dataset templates: LookupSQLDataset

It is associated with these activity templates in this specific order:

  1. Lookup
  2. SetAssetStatusGenerated
  3. CheckLastUpdated
  4. RunPythonScript
  5. SetAssetStatusGenerated

pipeline-file-ingestion-databricks

How does it work

Pipeline launch

The pipeline is executed by the Sidra API when a file is registered. The parameters of the pipeline are covered with information retrieved from the Core database about the file and the entity associated to it.

CheckDatesAndDropCreate activity

This native Lookup activity uses the CoreAzureSQLDatabase dataset -which uses the LookupSQLDataset template- to access to the Core database and selects the LastUpdated, LastDeployed, DropAndCreateORCTableOnChange and EntityId of the file referenced by the pipeline parameters.

ChangeAssetStatus-From-1-to-7 activity

This native Stored Procedure activity executes the ChangeAssetStatus in the Core database. The procedure updates the AssetStatus from 1 -LoadToAzureBlobStorage- to 7 -ImportingInDataLake- for the file referenced by the pipeline parameters.

CheckLastUpdated activity

This native If Condition activity checks if the LastUpdated date is later than the LastDeployed date -that information is received from the CheckDatesAndDropCreate-, if so it executes the generate-transfer-query pipeline.

RunTransferQuery activity

This native Databricks Python activity executes the python script that contains the transfer query and that has been generated by the previous activity.

ChangeAssetStatus-From-7-to-2 activity

This native Stored Procedure activity executes the ChangeAssetStatus procedure using different parameters to update the AssetStatus from 7 -ImportingInDataLake- to 2 -MovedToDataLake- for the file referenced by the pipeline parameters.