Gold Layer Delta MERGE Deduplication Pattern
Overview
Gold layer merge operations read from Silver and write to Gold using Delta MERGE. This skill standardizes the deduplication pattern required to prevent [DELTA_MULTIPLE_SOURCE_ROW_MATCHING_TARGET_ROW_IN_MERGE] errors.
When to Use This Skill
Use this skill when:
- •Merging from Silver to Gold tables
- •Silver may contain duplicate business keys due to:
- •Incremental DLT streaming ingestion
- •Change Data Capture (CDC) patterns
- •Historical SCD Type 2 tracking
- •Test data generation creating duplicates
- •Multiple batch loads
- •Encountering
DELTA_MULTIPLE_SOURCE_ROW_MATCHING_TARGET_ROW_IN_MERGEerrors
Critical Rule: Always Deduplicate Before MERGE
Delta MERGE operations REQUIRE unique source keys. Multiple source rows matching the same target row causes ambiguity and fails the merge.
Without deduplication: MERGE fails with error:
[DELTA_MULTIPLE_SOURCE_ROW_MATCHING_TARGET_ROW_IN_MERGE] Cannot perform Merge as multiple source rows matched and attempted to modify the same target row in the Delta table in possibly conflicting ways.
Standard Deduplication Pattern
✅ CORRECT: Deduplicate Before MERGE
def merge_dim_store(spark: SparkSession, catalog: str, silver_schema: str, gold_schema: str):
"""Merge dim_store from Silver to Gold (SCD Type 2)."""
print("Merging dim_store...")
silver_table = f"{catalog}.{silver_schema}.silver_store_dim"
gold_table = f"{catalog}.{gold_schema}.dim_store"
# CRITICAL: Deduplicate Silver source before merging
silver_raw = spark.table(silver_table)
original_count = silver_raw.count()
silver_df = (
silver_raw
.orderBy(col("processed_timestamp").desc()) # Order by timestamp first (latest first)
.dropDuplicates(["store_number"]) # Keep only first occurrence per business key
)
dedupe_count = silver_df.count()
print(f" Deduplicated: {original_count} → {dedupe_count} records ({original_count - dedupe_count} duplicates removed)")
# Prepare Gold updates
updates_df = (
silver_df
.withColumn("store_key", md5(concat_ws("||", col("store_id"), col("processed_timestamp"))))
# ... other transformations
)
# MERGE into Gold
delta_gold = DeltaTable.forName(spark, gold_table)
delta_gold.alias("target").merge(
updates_df.alias("source"),
"target.store_number = source.store_number AND target.is_current = true"
).whenMatchedUpdate(set={
"record_updated_timestamp": "source.record_updated_timestamp"
}).whenNotMatchedInsertAll(
).execute()
record_count = updates_df.count()
print(f"✓ Merged {record_count} records into dim_store")
❌ WRONG: No Deduplication
# ❌ BAD: No deduplication - reads ALL Silver records including duplicates
silver_df = spark.table(silver_table)
# ❌ MERGE will fail if multiple rows have same store_number
delta_gold.alias("target").merge(
updates_df.alias("source"),
"target.store_number = source.store_number"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
Key Requirements
1. Order by Timestamp Descending
Always order by processed_timestamp DESC before deduplication to keep the LATEST record:
.orderBy(col("processed_timestamp").desc()) # Latest first
Why important:
- •Ensures most recent data is kept
- •Older versions are dropped
- •Consistent with SCD Type 2 patterns
2. Match Deduplication Key to Merge Key
CRITICAL: The column used in .dropDuplicates() MUST match the column in the MERGE condition.
✅ CORRECT: Keys Match
# Deduplicate on product_code
silver_df = (
silver_raw
.orderBy(col("processed_timestamp").desc())
.dropDuplicates(["product_code"])
)
# Merge on product_code (SAME KEY)
delta_gold.alias("target").merge(
updates_df.alias("source"),
"target.product_code = source.product_code"
).execute()
❌ WRONG: Keys Don't Match
# Deduplicate on upc_code .dropDuplicates(["upc_code"]) # ❌ WRONG KEY # Merge on product_code (DIFFERENT KEY!) "target.product_code = source.product_code" # Still has duplicates!
Result: MERGE still fails because multiple upc_code records can have the same product_code.
3. Use dropDuplicates() Not Window Functions
Preferred approach: .dropDuplicates() with ordering
# ✅ SIMPLE and RELIABLE
silver_df = (
spark.table(silver_table)
.orderBy(col("processed_timestamp").desc())
.dropDuplicates(["store_number"])
)
Avoid: Window functions with row_number() - more complex with no benefit.
4. Add Debug Logging
Always log deduplication metrics for visibility:
original_count = silver_raw.count()
dedupe_count = silver_df.count()
print(f" Deduplicated: {original_count} → {dedupe_count} records ({original_count - dedupe_count} duplicates removed)")
Benefits:
- •Proves deduplication is working
- •Shows magnitude of duplicate problem
- •Helps troubleshoot merge failures
- •Provides data quality insights
Quick Reference
Standard Pattern Template
silver_raw = spark.table(silver_table)
original_count = silver_raw.count()
silver_df = (
silver_raw
.orderBy(col("processed_timestamp").desc()) # Latest first
.dropDuplicates([business_key]) # Keep first (latest) per business key
)
dedupe_count = silver_df.count()
print(f" Deduplicated: {original_count} → {dedupe_count} records")
# Then proceed with merge
delta_gold.alias("target").merge(
updates_df.alias("source"),
f"target.{business_key} = source.{business_key}" # MUST MATCH deduplication key
).execute()
Core Patterns
Pattern 1: Simple SCD Type 1 (Overwrite)
# Deduplicate on product_code (business key)
silver_df = (
silver_raw
.orderBy(col("processed_timestamp").desc())
.dropDuplicates(["product_code"])
)
delta_gold.alias("target").merge(
updates_df.alias("source"),
"target.product_code = source.product_code"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
Pattern 2: SCD Type 2 (Historical Tracking)
# Deduplicate on store_number (business key)
silver_df = (
silver_raw
.orderBy(col("processed_timestamp").desc())
.dropDuplicates(["store_number"])
)
delta_gold.alias("target").merge(
updates_df.alias("source"),
"target.store_number = source.store_number AND target.is_current = true"
).whenMatchedUpdate(set={
"record_updated_timestamp": "source.record_updated_timestamp"
}).whenNotMatchedInsertAll().execute()
Pattern 3: Fact Table with Aggregation
# Aggregation handles deduplication, but MERGE key must be composite
daily_sales = (
transactions
.groupBy("store_number", "upc_code", "transaction_date")
.agg(
spark_sum(col("final_sales_price")).alias("net_revenue"),
spark_sum(col("quantity_sold")).alias("net_units")
)
)
delta_gold.alias("target").merge(
daily_sales.alias("source"),
"""target.store_number = source.store_number
AND target.upc_code = source.upc_code
AND target.transaction_date = source.transaction_date"""
).execute()
See references/dedup-patterns.md for complete pattern examples.
Validation Checklist
Before deploying Gold MERGE operations:
- • Source data is deduplicated with
.dropDuplicates() - • Deduplication key matches MERGE condition key
- • Ordered by
processed_timestamp DESC - • Debug logging shows deduplication metrics
- • Error handling with try/except
- • Success/failure messages printed
- • Tested with actual Silver data containing duplicates
- • MERGE condition uses correct business key
Common Mistakes to Avoid
❌ Mistake 1: No Deduplication
# BAD silver_df = spark.table(silver_table)
❌ Mistake 2: Wrong Deduplication Key
# BAD - Deduplicate on upc_code but merge on product_code .dropDuplicates(["upc_code"]) # ... later ... "target.product_code = source.product_code"
❌ Mistake 3: No Ordering
# BAD - Which duplicate to keep is non-deterministic .dropDuplicates(["store_number"]) # No orderBy!
✅ CORRECT: Simple and Reliable
# GOOD
silver_df = (
spark.table(silver_table)
.orderBy(col("processed_timestamp").desc())
.dropDuplicates(["store_number"])
)
Performance Considerations
1. Deduplicate Before Transformations
Deduplicate BEFORE applying transformations to reduce compute:
# ✅ GOOD - Deduplicate first, then transform
silver_df = (
spark.table(silver_table)
.orderBy(col("processed_timestamp").desc())
.dropDuplicates(["store_number"]) # Reduces data size
)
updates_df = apply_transformations(silver_df) # Works on smaller dataset
2. Partition Pruning
If Silver tables are partitioned, leverage partition pruning:
silver_df = (
spark.table(silver_table)
.filter(col("transaction_date") >= "2025-01-01") # Partition pruning
.orderBy(col("processed_timestamp").desc())
.dropDuplicates(["store_number"])
)
Reference Files
- •
references/dedup-patterns.md- Detailed deduplication SQL patterns, generic reusable functions, complete scenario examples, and troubleshooting patterns
Related Patterns
- •Gold Layer Schema Validation - See
gold/gold-layer-schema-validationskill - •Gold Layer Merge Patterns - See
gold/gold-layer-merge-patternsskill
References
- •Delta Lake MERGE - Official documentation
- •PySpark dropDuplicates
Key Takeaway
Delta MERGE operations require unique source keys. ALWAYS deduplicate Silver data before merging to Gold using
.orderBy(col("processed_timestamp").desc()).dropDuplicates([business_key])to prevent[DELTA_MULTIPLE_SOURCE_ROW_MATCHING_TARGET_ROW_IN_MERGE]errors.
This pattern is mandatory for all Gold layer MERGE operations.