A production-grade, metadata-driven SCD Type 1 & Type 2 processing framework built on PySpark, Delta Lake, and Azure Databricks.
Battle-tested for large-scale banking and retail data platforms.
Enterprise data warehouses across banking, insurance, and retail sectors face a persistent challenge: source systems continuously overwrite records without preserving history. A customer's address change, a product's price revision, or an account's status transition carries critical audit and analytical value β but vanishes the moment the source system updates.
Traditional SCD implementations suffer from:
- Hardcoded column lists that break when source schemas evolve
- Phantom updates caused by timestamp-only changes (no actual data change)
- No late-arriving data handling β out-of-order CDC events silently corrupt history
- Manual surrogate key management prone to collision and drift
- Tightly coupled pipeline code that cannot be reused across dimensions
- Missing observability β no metrics, no audit trail, no lineage
This framework eliminates all of the above. Every pipeline is declared as a YAML configuration file. The engine handles hashing, deduplication, late-arrival, soft deletes, schema evolution, and Delta Lake optimization β automatically.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β DATA FLOW ARCHITECTURE β
βββββββββββββββ¬βββββββββββββ¬βββββββββββββββββββ¬ββββββββββββββββ¬βββββββββββ€
β SOURCE β BRONZE β SCD ENGINE β SILVER β GOLD β
β β β β β β
β ββββββββββ β ββββββββββ β βββββββββββββββββ βββββββββββββ ββββββββββ β
β β CRM DB βββΌββRaw CSV βββΌββHash Generatorββ β SCD Type 1β ββ Star β β
β β ERP β β βParquet β β βDeduplication ββ β Table β ββSchema β β
β β Kafka β β βDelta β β βLate Arrival ββ βββββββββββββ€ ββQueriesβ β
β β APIs β β ββββββββββ β βSchema Evol. ββ β SCD Type 2β ββββββββββ β
β ββββββββββ β β βMERGE Engine ββ β w/ Historyβ β β
β β β βββββββββββββββββ βββββββββββββ β β
β β β β β β
β β β βββββββββββββββββ βββββββββββββ β β
β β β βPipeline Configββ β Quarantineβ β β
β β β β(YAML-driven) ββ β Table β β β
β β β βββββββββββββββββ βββββββββββββ β β
βββββββββββββββ΄βββββββββββββ΄βββββββββββββββββββ΄ββββββββββββββββ΄βββββββββββ
| Component | Responsibility |
|---|---|
SCDEngine |
Orchestrates the end-to-end pipeline; routes to SCD1 or SCD2 |
HashGenerator |
Computes SHA-256 fingerprints for change detection |
SCDType1Processor |
Delta MERGE for overwrite semantics (INSERT + UPDATE) |
SCDType2Processor |
Two-phase MERGE for history preservation (expire + insert) |
SCDPipelineConfig |
Immutable, YAML-loaded configuration per pipeline |
DeltaUtils |
Delta-specific helpers: CDF, time-travel, OPTIMIZE, VACUUM |
MetricsTracker |
Structured logging + Delta audit table writes |
INCREMENTAL BATCH ARRIVES
β
ββββββββββΌβββββββββ
β Schema Evolution β β Detect new columns, auto-merge
ββββββββββ¬βββββββββ
β
ββββββββββΌβββββββββ
β Deduplication β β Window fn on BK + event_timestamp
ββββββββββ¬βββββββββ
β
ββββββββββΌβββββββββ
β Late Arriving β β Quarantine or reprocess
β Data Handler β
ββββββββββ¬βββββββββ
β
ββββββββββΌβββββββββ
β SHA-256 Hash β β Generate row fingerprint
β Generation β
ββββββββββ¬βββββββββ
β
ββββββββββββββββΌβββββββββββββββ
β β
ββββββββΌβββββββ ββββββββΌβββββββ
β SCD Type 1 β β SCD Type 2 β
β MERGE β β Phase 1: β
β (Overwrite) β β Expire rows β
ββββββββ¬βββββββ ββββββββ¬βββββββ
β β
β ββββββββΌβββββββ
β β SCD Type 2 β
β β Phase 2: β
β β Insert new β
β ββββββββ¬βββββββ
β β
ββββββββββββββββ¬βββββββββββββββ
β
ββββββββββΌβββββββββ
β Post-Optimize β β OPTIMIZE ZORDER + VACUUM
ββββββββββ¬βββββββββ
β
ββββββββββΌβββββββββ
β Metrics & Audit β β Log run result to Delta table
βββββββββββββββββββ
| Technology | Version | Role |
|---|---|---|
| Apache Spark | 3.5.0 | Distributed compute engine |
| PySpark | 3.5.0 | Python API for Spark |
| Delta Lake | 3.1.0 | ACID transactions, time-travel, CDF |
| Azure Databricks | 14.3 LTS | Managed Spark platform with Photon |
| Azure Data Lake | Gen2 | Scalable object storage for Delta tables |
| Azure Data Factory | β | Pipeline orchestration & scheduling |
| Python | 3.10+ | Framework implementation language |
| PyYAML | 6.0+ | Metadata-driven config loading |
| GitHub Actions | β | CI/CD automation |
| pytest | 7.4+ | Unit and integration testing |
| Feature | SCD Type 1 | SCD Type 2 |
|---|---|---|
| INSERT new records | β | β |
| UPDATE changed records | β | β |
| Expire historical versions | β | β |
| Insert new versions | β | β |
| SHA-256 change detection | β | β |
| Soft delete handling | β | β |
| Surrogate key generation | β | β (UUID) |
| Effective date tracking | β | β |
| Current record flag | β | β |
| Late-arriving data | β | β |
| Schema evolution | β | β |
| YAML-driven config | β | β |
| Delta OPTIMIZE / VACUUM | β | β |
| Change Data Feed (CDF) | β | β |
| Metrics & audit logging | β | β |
Applied automatically via SCDPipelineConfig.apply_spark_tuning():
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")AQE dynamically coalesces post-shuffle partitions, eliminating small file overhead and skew-induced straggler tasks.
Run automatically post-write on every pipeline execution:
OPTIMIZE silver.customer_dim
ZORDER BY (customer_id, city, customer_segment);Z-ORDER clusters related data in the same Parquet files, enabling Delta's data skipping to eliminate entire file groups during predicate pushdown. Observed 15Γ query speedup on a 10M-row customer dimension after compaction.
Without hash comparison, every incremental load would re-process the entire source batch as updates β even for records where nothing actually changed. SHA-256 fingerprinting ensures only genuinely changed records trigger a MERGE action.
# Only changed records enter the MERGE path
WHEN MATCHED AND target._sha256_hash != source._sha256_hash β UPDATEThis reduces MERGE write amplification by up to 90% on low-churn dimensions.
Instead of a single expensive MERGE that attempts to handle both expirations and insertions, the framework uses a deliberate two-phase approach:
- Phase 1: Pure UPDATE (expire changed active rows) β low write cost
- Phase 2: Pure INSERT (new versions) β append-optimized
This avoids the NOT MATCHED BY SOURCE clause which forces a full table scan in single-phase implementations.
All tables are created with Databricks-optimized properties:
ALTER TABLE silver.customer_dim SET TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true',
'delta.enableChangeDataFeed' = 'true',
'delta.dataSkippingNumIndexedCols' = '32'
);Source-side deduplication uses a window function instead of groupBy + agg, avoiding a full shuffle:
window = Window.partitionBy(*bk_cols).orderBy(F.col(event_ts_col).desc())
df.withColumn("_row_num", F.row_number().over(window)).filter(col == 1)from pyspark.sql import SparkSession
from src.config.pipeline_config import SCDPipelineConfig
from src.scd.scd_engine import SCDEngine
spark = SparkSession.builder.appName("SCD1_Pipeline").getOrCreate()
config = SCDPipelineConfig.from_yaml("configs/product_catalog.yaml")
config.apply_spark_tuning(spark)
source_df = spark.read.format("delta").table("bronze.product_raw")
engine = SCDEngine(spark, config)
result = engine.run(source_df)
print(f"Inserted: {result.records_inserted} | Updated: {result.records_updated}")# Phase 1: Expire changed active rows
(
DeltaTable.forName(spark, "silver.customer_dim").alias("target")
.merge(
source_df.alias("source"),
"target.customer_id = source.customer_id AND target.is_current = 1"
)
.whenMatchedUpdate(
condition="target._sha256_hash != source._sha256_hash",
set={
"effective_end_date": "date_sub(source.effective_start_date, 1)",
"is_current": "0",
"_updated_at": "current_timestamp()",
}
)
.execute()
)
# Phase 2: Insert new versions
new_versions_df.write.format("delta").mode("append").saveAsTable("silver.customer_dim")from src.scd.hash_generator import HashGenerator
# Columns to track for change detection
hash_cols = ["first_name", "last_name", "email", "phone", "city", "customer_segment"]
gen = HashGenerator(columns=hash_cols)
df = gen.generate(df)
# df now has `_sha256_hash` column β 64-char hex string
# Convenience function
from src.scd.hash_generator import add_sha256_hash
df = add_sha256_hash(df, columns=hash_cols)config = SCDPipelineConfig.from_yaml("configs/customer_dim.yaml")
# Business keys β used in MERGE join condition
print(config.business_key_columns) # ["customer_id"]
# Hash columns β used for change detection
print(config.hash_columns) # ["first_name", "last_name", "email", ...]
# Z-ORDER columns β used in post-write optimization
print(config.z_order_columns) # ["customer_id", "city", "customer_segment"]Push to PR Push to main Push to main (after tests)
β β β
βΌ βΌ βΌ
βββββββββββ ββββββββββββββββ ββββββββββββββββββ
β Lint β β Unit Tests β β Build Wheel β
β Black β β (PySpark β β (python -m β
β isort β β local mode) β β build) β
β flake8 β β Coverage 80%+β βββββββββ¬βββββββββ
ββββββ¬βββββ ββββββββ¬ββββββββ β
β β βΌ
βΌ βΌ ββββββββββββββββββ
βββββββββββ ββββββββββββββββ β Deploy to DBFS β
β Config β β Integration β β Databricks CLI β
β Validateβ β Tests β β Trigger job β
βββββββββββ ββββββββββββββββ ββββββββββββββββββ
| Secret | Description |
|---|---|
DATABRICKS_HOST |
Databricks workspace URL |
DATABRICKS_TOKEN |
PAT with Jobs + DBFS permissions |
DATABRICKS_VALIDATION_JOB_ID |
Job ID of post-deploy validator |
Every run writes a record to silver.scd_pipeline_audit:
SELECT
pipeline_name,
scd_type,
records_inserted,
records_updated,
execution_time_seconds,
run_timestamp,
status
FROM silver.scd_pipeline_audit
ORDER BY run_timestamp DESC
LIMIT 20;All target tables have CDF enabled. Downstream consumers can stream changes:
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("silver.customer_dim")DESCRIBE HISTORY silver.customer_dim;
-- Shows every MERGE, OPTIMIZE, VACUUM with row-level operation metricsThe framework is designed to scale horizontally with Databricks autoscaling:
- State is stateless per run β no in-driver data accumulation
- Hash generation is a pure Spark transformation β no shuffle
- MERGE operations leverage Delta's file-level predicate pushdown
- Z-ORDER reduces data scanned per query as table grows
- Partition pruning via
partition_columnsconfig keeps incremental loads fast even at petabyte scale - AQE dynamically adjusts parallelism based on runtime statistics β no manual partition tuning required for most workloads
Tested scales: 1M β 100M records on 4-worker cluster (see benchmarks/BENCHMARK_METRICS.md).
pyspark-scd-framework/
β
βββ src/
β βββ scd/
β β βββ scd_engine.py # Main orchestrator
β β βββ scd_type1.py # SCD Type 1 MERGE processor
β β βββ scd_type2.py # SCD Type 2 two-phase processor
β β βββ hash_generator.py # SHA-256 fingerprint generator
β β
β βββ config/
β β βββ pipeline_config.py # SCDPipelineConfig dataclass
β β
β βββ utils/
β β βββ delta_utils.py # Delta Lake helpers
β β βββ logger.py # Structured logger
β β βββ metrics_tracker.py # Run metrics + audit writer
β β βββ surrogate_key.py # UUID surrogate key generator
β β
β βββ schema/ # (Future) schema registry integration
β
βββ tests/
β βββ unit/
β β βββ test_hash_generator.py # 11 unit tests for SHA-256 logic
β β βββ test_scd_type1.py # SCD1 processor unit tests
β β βββ test_scd_type2.py # SCD2 processor unit tests
β β
β βββ integration/ # End-to-end Delta Lake tests
β
βββ configs/
β βββ customer_dim.yaml # SCD Type 2: Customer dimension
β βββ product_catalog.yaml # SCD Type 1: Product master data
β
βββ github_actions/
β βββ ci_cd.yml # 7-stage GitHub Actions pipeline
β
βββ scripts/
β βββ validate_configs.py # YAML config validation CLI
β
βββ sample_data/
β βββ generate_sample_data.py # Synthetic data generator (1Kβ1M rows)
β
βββ benchmarks/
β βββ BENCHMARK_METRICS.md # Performance results & cost estimates
β
βββ notebooks/ # Databricks notebooks (exploratory)
βββ sql/ # Standalone SQL scripts
βββ docs/ # Extended documentation
β
βββ requirements.txt # Runtime dependencies
βββ requirements-dev.txt # Dev/test dependencies
βββ README.md
- Python 3.10+
- Java 11 (required for PySpark local mode)
- Git
# 1. Clone the repository
git clone https://github.com/your-org/pyspark-scd-framework.git
cd pyspark-scd-framework
# 2. Create virtual environment
python -m venv .venv
source .venv/bin/activate # macOS/Linux
.venv\Scripts\activate # Windows
# 3. Install runtime + dev dependencies
pip install -r requirements.txt
pip install -r requirements-dev.txt
# 4. Generate sample data
python sample_data/generate_sample_data.py
# 5. Validate configs
python scripts/validate_configs.py configs/# Install Databricks CLI
pip install databricks-cli
# Configure authentication
databricks configure --token
# Enter: host (https://adb-xxxx.azuredatabricks.net), token
# Upload framework wheel
pip install build && python -m build
databricks fs cp dist/*.whl dbfs:/libs/pyspark-scd-framework/ --overwrite
# Upload configs
databricks fs cp configs/ dbfs:/configs/scd-framework/ --recursive --overwriteIn your Databricks cluster β Libraries β Install from DBFS:
dbfs:/libs/pyspark-scd-framework/pyspark_scd_framework-*.whl
# In a Databricks notebook or job
from src.config.pipeline_config import SCDPipelineConfig
from src.scd.scd_engine import SCDEngine
config = SCDPipelineConfig.from_yaml("/dbfs/configs/scd-framework/customer_dim.yaml")
config.apply_spark_tuning(spark)
source_df = spark.read.format("delta").table("bronze.customer_raw")
engine = SCDEngine(spark, config)
result = engine.run(source_df)- Create
configs/your_dimension.yamlβ no code changes required. - Commit and push β CI validates the config automatically.
- Pipeline is immediately available to the engine.
# Unit tests (fast β no Delta Lake required)
pytest tests/unit/ -v --cov=src --cov-report=term-missing
# Integration tests (requires Delta Lake JARs)
pytest tests/integration/ -v
# All tests with coverage gate
pytest tests/ --cov=src --cov-fail-under=80| Feature | Priority | Notes |
|---|---|---|
| SCD Type 3 (previous value) | Medium | Add prev_<col> columns for single-step rollback |
| SCD Type 6 (hybrid 1+2+3) | Low | Full hybrid implementation for retail analytics |
| Apache Iceberg support | Medium | Multi-cloud compatibility (AWS Glue, GCP BigLake) |
| Great Expectations integration | High | Pre-MERGE data quality gates |
| Schema Registry (Confluent) | Medium | Avro/Protobuf schema enforcement at ingest |
| Streaming SCD (Spark Structured) | High | Real-time SCD from Kafka β Delta via foreachBatch |
| dbt model generation | Medium | Auto-generate dbt models from SCD2 target tables |
| Unity Catalog lineage | High | Tag source/target tables for end-to-end lineage |
| Cost attribution tagging | Low | Tag Delta tables with pipeline cost metadata |
| Backfill automation | Medium | Replay full history from source on-demand |
MIT License β free to use, modify, and distribute with attribution.