-
Notifications
You must be signed in to change notification settings - Fork 162
05 Spark Notebooks
This accelerator includes pre-built Spark Notebooks that can be invoked from data factory pipelines with minimal configuration updates. Our primary design principle is to write reusable code for repeatable data patterns, such as upserts, Type 2 SCD, replacing empty values with defaults, time zone conversions, and so on.
CommonTransforms is a Python class that uses PySpark to apply common transformations to a Spark dataframe. While PySpark has built-in capabilities to cleanse and enrich data at the column level, this library extends PySpark's functionality to the dataframe scope. Consequently, common activities like trimming and replacing empty values with defaults can be performed with a single function call for the entire dataframe, rather than calling multiple functions for each column. This class is adapted from pyspark-utils
Supported functions at the dataframe scope include:
- trim: Removes leading and trailing spaces from all string columns in the dataframe.
- replaceNull: Replaces null values in the dataframe with a default value, which can be applied to all columns or a subset of columns specified as a list. The default value can be numeric, string, date, timestamp, boolean, or a dictionary object. When a dictionary object is used, custom default values can be applied to specified columns, but only to columns of the same data type.
- deDuplicate: Deletes duplicate records from the dataframe, with an option to consider a subset of key columns.
- utc_to_local: Converts all or a subset of timestamp columns from UTC to the local timezone.
- local_to_utc: Converts all or a subset of timestamp columns from the local timezone to UTC.
- changeTimezone: Converts all or selected timestamps in the dataframe from one timezone to another.
- dropSysColumns: Drops columns that are either system-related or non-business from the dataframe.
- addChecksumCol: Creates a checksum column using all columns of the dataframe.
- julian_to_calendar: Converts a 5-digit or 7-digit Julian date to a calendar date.
- addLitCols: Adds a set of literal value columns to the dataframe, passed as a dictionary parameter (e.g., adding audit columns to a dataframe).
DeltaLakeFunctions is a collection of functions for common Lakehouse tasks, such as reading, writing, and upserting data to a Lakehouse table.
Supported functions include:
- getAbfsPath: Gets the Azure Blob File System (ABFS) path of the OneLake Medallion layer.
- readFile: Reads a data file from OneLake and returns it as a Spark dataframe.
- readMedallionLHTable: Retrieves a Lakehouse Table from the medallion layers, allowing for table filtering and specific column selection.
- readLHTable: Retrieves a any Lakehouse Table from OneLake, allowing for table filtering and specific column selection. Use this function to read from any Lakehouse in Onelake outside data platform for e.g. a mirrored database, Fabric SQL etc.
- insertDelta: Inserts a dataframe into a Lakehouse table as an append or overwrite. If the table doesn't already exist, a new one is created with the schema of the dataframe.
- upsertDelta: Inserts or updates a dataframe in a Lakehouse table. Inserts new records and updates existing records if the version is newer than the existing one.
- getHighWaterMark: Retrieves High Watermark from a Lakehouse table for a given date range
- optimizeDelta: Compacts small files and removes unused files beyond the default retention period for a Delta table.
entra-functions is a collection of functions for common Entra tasks, such as obtaining a bearer token.
Supported functions include:
- getBearerToken: Returns a bearer token for service principal Entra authentication.
This notebook iterates through all tables in the Lakehouse and runs OPTIMIZE and VACUUM commands. It is recommended to schedule this notebook weekly to keep your Lakehouses healthy and performing optimally by addressing the problem of small files and removing unused historical versions.
This notebook exemplifies a typical Level 1 Transformation.
- It calls functions from the CommonTransforms notebook to cleanse, enrich, and curate raw data from the OneLake bronze layer.
- It then calls functions from the DeltaLakeFunctions notebook to insert, update, or upsert data into the Lakehouse in the OneLake silver layer.
- The number of rows impacted is returned to the calling data factory pipeline.
Use the EnvSettings notebook to set environment (prod/non-prod) specific variables that can be referenced in other spark notebooks. These variables need to set once in each environment
-
bronzeWorkspaceId: Fabric Workspace ID for the Bronze medallion layer
-
bronzeLakehouseName: Bronze Lakehouse name. Set to None if not applicable.
-
bronzeDatawarehouseName: Bronze data warehouse name. Set to None if not applicable
-
silverWorkspaceId: Fabric Workspace ID for the Silver medallion layer. Use the same ID if all medallion layers are in the same workspace.
-
silverLakehouseName: Silver Lakehouse name. Set to None if not applicable.
-
silverDatawarehouseName: Silver data warehouse name. Set to None if not applicable.
-
goldWorkspaceId: Fabric Workspace ID for the Gold medallion layer. Use the same ID if all medallion layers are in the same workspace.
-
goldLakehouseName: Gold Lakehouse name. Set to None if not applicable.
-
goldDatawarehouseName: Gold data warehouse name. Set to None if not applicable.