Transfer query script

The process for ingesting a file into Databricks is performed by an Azure Data Factory (ADF) pipeline that generates and executes two scripts:

  • The table creation script. It creates the necessary database and tables in the Databricks cluster.
  • The transfer query script. It reads the raw copy of the asset and insert the information in the tables created by the previous script.

This transfer query script is generated by the custom activity GenerateTransferQuery using the information stored in the Core database about the entity. The script, when executed by a Databricks Python activity in the pipeline, follows these steps:

1. Retrieve secrets

In order to access to the Azure Storage where the file to ingest is stored, the script retrieves some secrets from Databricks. Those secrets are configured in Databricks by the deployment project.

2. Process input parameters

The script receives some input parameters from the pipeline:

  • Pipeline execution identifier -RunId-. Used for monitoring and lineage purposes.
  • Some information related to the file to ingest, e.g. date of the file, path to the file location in Azure Storage...The values for those parameters are configured in the Databricks Python activity.

3. Create staging database

Before inserting the data from the file in the table created by the CreateTableScript, the data is inserted in a temporary table named staging table. The staging table resides in the staging database which is created in this step and whose name is the same that the database that contains the table with the staging_ prefix. This table contains some additional columns, as HasErrors or LoadDate.

4. Load file into a DataFrame

The custom activity uses the entity columns SerdeProperties, HeaderLines, FieldDelimiter and Extension to generate the command to load the file into a DataFrame.

5. Truncate old partition from same file

It drops the partition in the entity's table and in the Validation Error table.

6. Create temporal table from the DataFrame

It uses the same name that the table but adding the RunId parameter as suffix. Now that the data is a table, it is possible to use SparkSQL to query the data.

7. Configure query optimization

It setup some parameters to optimize the query, for example:

1
2
3
spark.sql('SET hive.exec.dynamic.partition = true')
spark.sql('SET hive.exec.dynamic.partition.mode = nonstrict')
spark.sql('SET hive.optimize.sort.dynamic.partition = false')

8. Insert into Staging table

It uses a SparkSQL query generated by the custom activity to copy the data to the Staging table. The name of the Staging table is the same that the temporary table but adding the suffix _tempFileView. At the same time that the SparkSQL query copies the data to the staging table, it performs additional validation.

9. Create table insert

It fills the Databricks table for the entity with data from the Staging table. If encryption attribute is enabled, it will encrypt the data.

10. Create validation errors table insert

It fills the Validation Errors table with data from the Staging table. In this case, only the rows who have errors are inserted.

11. Drop Staging table

At this point, both Staging tables are dropped, as they are no longer needed.

12. Update number of entities

It counts the number of entities in the file and makes a request to the API to update the file information.

13. Update number of validation errors

Finally, and similarly to the previous step, it counts the number of validation errors and updates the file information making a request to the API.