Transfer query script¶
The process for ingesting a file into Databricks is performed by an Azure Data Factory (ADF) pipeline that generates and executes two scripts:
- The table creation script. It creates the necessary database and tables in the Databricks cluster.
- The transfer query script. It reads the raw copy of the asset and insert the information in the tables created by the previous script.
This transfer query script is generated by the custom activity
GenerateTransferQuery using the information stored in the Core database about the entity. The script, when executed by a Databricks Python activity in the pipeline, follows these steps:
1. Retrieve secrets¶
In order to access to the Azure Storage where the file to ingest is stored, the script retrieves some secrets from Databricks. Those secrets are configured in Databricks by the deployment project.
2. Process input parameters¶
The script receives some input parameters from the pipeline:
- Pipeline execution identifier -RunId-. Used for monitoring and lineage purposes.
- Some information related to the file to ingest, e.g. date of the file, path to the file location in Azure Storage...The values for those parameters are configured in the Databricks Python activity.
3. Create staging database¶
Before inserting the data from the file in the table created by the
CreateTableScript, the data is inserted in a temporary table named staging table. The staging table resides in the staging database which is created in this step and whose name is the same that the database that contains the table with the staging_ prefix. This table contains some additional columns, as
4. Load file into a DataFrame¶
The custom activity uses the entity columns
Extension to generate the command to load the file into a DataFrame.
5. Truncate old partition from same file¶
It drops the partition in the entity's table and in the Validation Error table.
6. Create temporal table from the DataFrame¶
It uses the same name that the table but adding the RunId parameter as suffix. Now that the data is a table, it is possible to use SparkSQL to query the data.
7. Configure query optimization¶
It setup some parameters to optimize the query, for example:
1 2 3
8. Insert into Staging table¶
It uses a SparkSQL query generated by the custom activity to copy the data to the Staging table. The name of the Staging table is the same that the temporary table but adding the suffix _tempFileView. At the same time that the SparkSQL query copies the data to the staging table, it performs additional validation.
9. Create table insert¶
It fills the Databricks table for the entity with data from the Staging table. If encryption attribute is enabled, it will encrypt the data.
10. Create validation errors table insert¶
It fills the Validation Errors table with data from the Staging table. In this case, only the rows who have errors are inserted.
11. Drop Staging table¶
At this point, both Staging tables are dropped, as they are no longer needed.
12. Update number of entities¶
It counts the number of entities in the file and makes a request to the API to update the file information.
13. Update number of validation errors¶
Finally, and similarly to the previous step, it counts the number of validation errors and updates the file information making a request to the API.