You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
# UPDATEspark.sql(""" UPDATE silver_customers SET status = 'active' WHERE last_login > '2024-01-01' -- Example date, adjust as needed""")
# DELETEspark.sql(""" DELETE FROM silver_customers WHERE is_deleted = true""")
# MERGE (Upsert)spark.sql(""" MERGE INTO silver_customers AS target USING staging_customers AS source ON target.customer_id = source.customer_id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *""")
%%sql
-- Query Delta table directlySELECT
customer_id,
COUNT(*) as order_count,
SUM(amount) as total_amount
FROM gold_orders
GROUP BY customer_id
ORDER BY total_amount DESCLIMIT10
V-Order Optimization
# Enable V-Order for read optimizationspark.conf.set("spark.sql.parquet.vorder.enabled", "true")
Table Optimization
%%sql
-- Optimize table (compact small files)
OPTIMIZE silver_transactions
-- Optimize with Z-ordering on query columns
OPTIMIZE silver_transactions ZORDER BY (customer_id, transaction_date)
-- Vacuum old files (default 7 days retention)
VACUUM silver_transactions
-- Vacuum with custom retention
VACUUM silver_transactions RETAIN 168 HOURS
Incremental Load Pattern
frompyspark.sql.functionsimportcol# Get last processed watermarklast_watermark=spark.sql(""" SELECT MAX(processed_timestamp) as watermark FROM silver_orders""").collect()[0]["watermark"]
# Load only new recordsnew_records=spark.read.format("delta") \
.table("bronze_orders") \
.filter(col("created_at") >last_watermark)
# Merge new recordsnew_records.createOrReplaceTempView("staging_orders")
spark.sql(""" MERGE INTO silver_orders AS target USING staging_orders AS source ON target.order_id = source.order_id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *""")
SCD Type 2 Pattern
frompyspark.sql.functionsimportcurrent_timestamp, lit# Close existing recordsspark.sql(""" UPDATE dim_customer SET is_current = false, end_date = current_timestamp() WHERE customer_id IN (SELECT customer_id FROM staging_customer) AND is_current = true""")
# Insert new versionsspark.sql(""" INSERT INTO dim_customer SELECT customer_id, name, email, address, current_timestamp() as start_date, null as end_date, true as is_current FROM staging_customer""")