Gold Layer MERGE Patterns
Core Principle: Schema-Aware Transformations
Gold layer merge operations read from Silver and must handle:
- •Column name differences
- •Data type transformations
- •Business logic calculations
- •SCD Type 2 tracking
Column Name Mapping Pattern
Problem: Column Names Differ Between Layers
Example: Silver has company_rcn, but Gold expects company_retail_control_number
❌ DON'T: Reference non-existent columns
updates_df = (
silver_df
.select(
"store_number",
"company_retail_control_number", # ❌ This column doesn't exist in Silver!
# ...
)
)
✅ DO: Map columns explicitly with withColumn
updates_df = (
silver_df
# Map Silver column to Gold column name
.withColumn("company_retail_control_number", col("company_rcn"))
.select(
"store_number",
"company_retail_control_number", # ✅ Now this exists
# ...
)
)
Variable Naming Conflicts
Problem: Import Conflicts with Local Variables
Critical Rule: NEVER name local variables the same as imported PySpark functions.
❌ DON'T: Shadow imported functions
from pyspark.sql.functions import count
def merge_data():
# Later in the code...
count = updates_df.count() # ❌ Shadows the imported 'count' function!
# This will fail:
df.agg(count("*")) # Error: 'int' object is not callable
✅ DO: Use descriptive variable names
from pyspark.sql.functions import count
def merge_data():
record_count = updates_df.count() # ✅ No conflict
# This works:
df.agg(count("*")) # ✅ Uses the imported function
Common PySpark Functions to Avoid as Variable Names
- •
count→ userecord_count,row_count,num_records - •
sum→ usetotal,sum_value,aggregated_sum - •
min→ usemin_value,minimum - •
max→ usemax_value,maximum - •
round→ userounded_value,result - •
filter→ usefiltered_df,subset
Merge Operation Patterns
SCD Type 1 (Overwrite)
Use for: Dimension tables where history doesn't matter
Template: See assets/templates/scd-type1-merge.py for complete pattern.
def merge_dim_product(spark: SparkSession, catalog: str, silver_schema: str, gold_schema: str):
"""Merge dim_product from Silver to Gold (SCD Type 1)."""
silver_table = f"{catalog}.{silver_schema}.silver_product_dim"
gold_table = f"{catalog}.{gold_schema}.dim_product"
silver_df = spark.table(silver_table)
# Prepare updates with column mappings
updates_df = (
silver_df
.withColumn("product_key", col("upc_code")) # Business key
.withColumn("record_updated_timestamp", current_timestamp())
.select(
"product_key", "upc_code", "product_description",
# ... other columns
"record_updated_timestamp"
)
)
delta_gold = DeltaTable.forName(spark, gold_table)
# SCD Type 1: Update all fields when matched
delta_gold.alias("target").merge(
updates_df.alias("source"),
"target.product_key = source.product_key"
).whenMatchedUpdateAll(
).whenNotMatchedInsertAll(
).execute()
record_count = updates_df.count() # ✅ Good variable name
print(f"✓ Merged {record_count} records into dim_product")
SCD Type 2 (Historical Tracking)
Use for: Dimension tables where you need to track changes over time
Template: See assets/templates/scd-type2-merge.py for complete pattern.
def merge_dim_store(spark: SparkSession, catalog: str, silver_schema: str, gold_schema: str):
"""Merge dim_store from Silver to Gold (SCD Type 2)."""
silver_table = f"{catalog}.{silver_schema}.silver_store_dim"
gold_table = f"{catalog}.{gold_schema}.dim_store"
silver_df = spark.table(silver_table)
updates_df = (
silver_df
# Generate surrogate key
.withColumn("store_key", md5(concat_ws("||", col("store_id"), col("processed_timestamp"))))
# SCD Type 2 columns
.withColumn("effective_from", col("processed_timestamp"))
.withColumn("effective_to", lit(None).cast("timestamp"))
.withColumn("is_current", lit(True))
# Derived columns
.withColumn("store_status",
when(col("close_date").isNotNull(), "Closed").otherwise("Active"))
# Column mappings
.withColumn("company_retail_control_number", col("company_rcn"))
# Timestamps
.withColumn("record_created_timestamp", current_timestamp())
.withColumn("record_updated_timestamp", current_timestamp())
.select(
"store_key", "store_number", "store_name",
"company_retail_control_number", # Mapped column
"effective_from", "effective_to", "is_current",
# ... other columns
)
)
delta_gold = DeltaTable.forName(spark, gold_table)
# SCD Type 2: Only update timestamp for current records
delta_gold.alias("target").merge(
updates_df.alias("source"),
"target.store_number = source.store_number AND target.is_current = true"
).whenMatchedUpdate(set={
"record_updated_timestamp": "source.record_updated_timestamp"
}).whenNotMatchedInsertAll(
).execute()
record_count = updates_df.count()
print(f"✓ Merged {record_count} records into dim_store")
Fact Table Aggregation
Use for: Pre-aggregated fact tables from transactional Silver data
Template: See assets/templates/fact-table-aggregation-merge.py for complete pattern.
def merge_fact_sales_daily(spark: SparkSession, catalog: str, silver_schema: str, gold_schema: str):
"""Merge fact_sales_daily from Silver to Gold."""
silver_table = f"{catalog}.{silver_schema}.silver_transactions"
gold_table = f"{catalog}.{gold_schema}.fact_sales_daily"
transactions = spark.table(silver_table)
# Aggregate daily sales
daily_sales = (
transactions
.groupBy("store_number", "upc_code", "transaction_date")
.agg(
spark_sum(when(col("quantity_sold") > 0, col("final_sales_price")).otherwise(0)).alias("gross_revenue"),
spark_sum(col("final_sales_price")).alias("net_revenue"),
spark_sum(when(col("quantity_sold") > 0, col("quantity_sold")).otherwise(0)).alias("units_sold"),
count("*").alias("transaction_count"), # ✅ PySpark function
# ... more aggregations
)
.withColumn("record_created_timestamp", current_timestamp())
.withColumn("record_updated_timestamp", current_timestamp())
)
delta_gold = DeltaTable.forName(spark, gold_table)
delta_gold.alias("target").merge(
daily_sales.alias("source"),
"""target.store_number = source.store_number
AND target.upc_code = source.upc_code
AND target.transaction_date = source.transaction_date"""
).whenMatchedUpdate(set={
"net_revenue": "source.net_revenue",
"units_sold": "source.units_sold",
"transaction_count": "source.transaction_count",
"record_updated_timestamp": "source.record_updated_timestamp"
}).whenNotMatchedInsertAll(
).execute()
record_count = daily_sales.count() # ✅ Good variable name
print(f"✓ Merged {record_count} records into fact_sales_daily")
Schema Evolution Handling
Data Type Changes
# Example: INT to BIGINT migration
.withColumn("quantity_sold", col("quantity_sold").cast("bigint"))
# Example: DECIMAL to DOUBLE migration
.withColumn("price", col("price").cast("double"))
Adding Derived Columns
# Always calculate in the merge script, not in the target table
.withColumn("total_discount",
coalesce(col("multi_unit_discount"), lit(0)) +
coalesce(col("coupon_discount"), lit(0)) +
coalesce(col("loyalty_discount"), lit(0)))
Error Handling Pattern
def main():
"""Main entry point for Gold layer MERGE operations."""
catalog, silver_schema, gold_schema = get_parameters()
spark = SparkSession.builder.appName("Gold Layer MERGE").getOrCreate()
try:
# Merge dimensions
merge_dim_store(spark, catalog, silver_schema, gold_schema)
merge_dim_product(spark, catalog, silver_schema, gold_schema)
merge_dim_date(spark, catalog, silver_schema, gold_schema)
# Merge facts
merge_fact_sales_daily(spark, catalog, silver_schema, gold_schema)
merge_fact_inventory_snapshot(spark, catalog, silver_schema, gold_schema)
print("\n" + "=" * 80)
print("✓ Gold layer MERGE completed successfully!")
print("=" * 80)
except Exception as e:
print(f"\n❌ Error during Gold layer MERGE: {str(e)}")
raise
finally:
spark.stop()
Validation Checklist
Before deploying Gold merge scripts:
- • All Silver column references exist
- • Column mappings are explicit (using
.withColumn()) - • No variable names shadow PySpark functions
- • MERGE conditions use correct join keys
- • SCD Type 2 includes
is_currentfilter - • Timestamps are added (
record_created_timestamp,record_updated_timestamp) - • Error handling with try/except
- • Meaningful print statements for monitoring
Common Errors and Solutions
Error: Column 'X' does not exist
Solution: Check Silver table schema. Add explicit column mapping if names differ.
Error: 'int' object is not callable
Solution: Variable name shadows a PySpark function. Rename the variable.
Error: Cartesian product detected
Solution: MERGE condition is missing or incorrect. Add proper join keys.
Error: Schema mismatch during MERGE
Solution: Cast columns to match target table schema explicitly.
References
Reference Files
This skill includes the following template files:
- •assets/templates/scd-type1-merge.py - Complete SCD Type 1 merge pattern template
- •assets/templates/scd-type2-merge.py - Complete SCD Type 2 merge pattern template
- •assets/templates/fact-table-aggregation-merge.py - Fact table aggregation merge template