Transfer query script

The process for ingesting a file into Databricks is performed by an Azure Data Factory (ADF) pipeline that generates and executes two scripts:

  • The table creation script creates the necessary database and tables in the Databricks cluster.
  • The transfer query script reads the raw copy of the asset and inserts the information in the tables created by the previous script.

This transfer query script is generated by the custom activity GenerateTransferQuery using the information stored in the Core database about the Entity. The script, when executed by a Databricks Python activity in the pipeline, follows these steps:

Step 1. Retrieve secrets

In order to access to the Azure Storage where the file to ingest is stored, the script retrieves some secrets from Databricks. Those secrets are configured in Databricks by the deployment project.

Step 2. Process input parameters

The script receives some input parameters from the pipeline:

  • Pipeline execution identifier -RunId-. Used for monitoring and lineage purposes.
  • Some information related to the file to ingest, e.g. date of the file, path to the file location in Azure Storage. The values for those parameters are configured in the Databricks Python activity.

Step 3. Create staging database

Before inserting the data from the file in the table created by the CreateTableScript, the data is inserted in a temporary table named staging table. The staging table resides in the staging database which is created in this step and whose name is the same that the database that contains the table with the staging_ prefix. This table contains some additional columns, as HasErrors or LoadDate.

Step 4. Load file into a DataFrame

The custom activity uses the Entity columns SerdeProperties, HeaderLines, FieldDelimiter and Extension to generate the command to load the file into a DataFrame.

Step 5. Truncate old partition from same file

The partition in the Entity's table and in the Validation Error table is dropped.

Step 6. Create temporal table from the DataFrame

The temporal table uses the same name as the table but adding the RunId parameter as suffix. Now that the data is a table, it is possible to use SparkSQL to query the data.

Step 7. Configure query optimization

Some parameters to optimize the query are setup during this step, for example:

1
2
3
spark.sql('SET hive.exec.dynamic.partition = true')
spark.sql('SET hive.exec.dynamic.partition.mode = nonstrict')
spark.sql('SET hive.optimize.sort.dynamic.partition = false')

Step 8. Insert into Staging table

A SparkSQL query generated by the custom activity to copy the data to the Staging table is executed. The name of the Staging table is the same that the temporary table but adding the suffix _tempFileView. At the same time as the SparkSQL query copies the data to the staging table, it performs additional validation.

Step 9. Create table insert

It The Databricks table for the Entity is filled with data from the Staging table. If encryption attribute is enabled, it will encrypt the data.

Step 10. Create validation errors table insert

The Validation Errors table is filled with data from the Staging table. In this case, only the rows which have errors are inserted.

Step 11. Drop Staging table

At this point, both Staging tables are dropped, as they are no longer needed.

Step 12. Update number of entities

This step counts the number of Entities in the file and makes a request to the API to update the file information.

Step 13. Update number of validation errors

Finally, and similarly to the previous step, the number of validation errors are counted and the file information is updated by making a request to the API.