Lakeflow Spark Declarative Pipelines (SDP)
Official Documentation
- •Lakeflow Spark Declarative Pipelines Overview - Main documentation hub
- •SQL Language Reference - SQL syntax for streaming tables and materialized views
- •Python Language Reference -
pyspark.pipelinesAPI - •Loading Data - Auto Loader, Kafka, Kinesis ingestion
- •Change Data Capture (CDC) - AUTO CDC, SCD Type 1/2
- •Developing Pipelines - File structure, testing, validation
- •Liquid Clustering - Modern data layout optimization
Workflow Selection
Determine the task type and follow the appropriate workflow:
| Task | Workflow |
|---|---|
| Build new pipeline | Follow "Development Workflow with MCP Tools" below |
| Review/audit existing pipelines | Follow "Review Mode" below |
| Migrate legacy DLT to SDP | See 6-dlt-migration.md |
| Troubleshoot pipeline failures | Use get_pipeline_events(), see "Common Issues" |
Review Mode
When asked to review, audit, check, or improve existing pipelines:
Step 1: Locate Pipeline Definitions
Search for pipeline configurations:
- •DAB pipelines:
**/dlt_pipelines/*.ymlorresources/pipelinesindatabricks.yml - •Source files:
.sqland.pyfiles referenced in pipelinelibraries
Step 2: Check Against 2025 Best Practices
| Check | Legacy Pattern | Modern Pattern | Reference |
|---|---|---|---|
| Python API | import dlt | from pyspark import pipelines as dp | 5-python-api.md |
| Ingestion | spark.readStream.format("cloudFiles") | read_files() SQL syntax | 1-ingestion-patterns.md |
| Clustering | PARTITION BY or none | CLUSTER BY (Liquid Clustering) | 4-performance-tuning.md |
| File format | Notebook (# Databricks notebook source) | Raw .sql/.py files | Best practice |
| SCD patterns | dlt.apply_changes() | AUTO CDC INTO SQL syntax | 3-scd-patterns.md |
Step 3: Review Checklist
- • Modern API: Using
pyspark.pipelines(dp) not legacydlt - • SQL preferred: Transformations in SQL unless Python required
- • Liquid Clustering:
CLUSTER BYon all tables, especially facts - • Data quality: Expectations defined (
EXPECT,@dp.expect_or_drop) - • No hardcoded configs: Variables from pipeline configuration, not
SETstatements - • Raw files: Not notebook format with
# COMMAND ---------- - • Serverless: No classic cluster configuration unless required
Step 4: Provide Recommendations
For each issue found:
- •Identify the file and line number
- •Show the current (legacy) code
- •Show the recommended (modern) code
- •Reference the appropriate documentation file
Development Workflow with MCP Tools
Use MCP tools to create, run, and iterate on serverless SDP pipelines. The primary tool is create_or_update_pipeline which handles the entire lifecycle.
IMPORTANT: Always create serverless pipelines (default). Only use classic clusters if user explicitly requires R language, Spark RDD APIs, or JAR libraries.
Step 1: Write Pipeline Files Locally
Create .sql or .py files in a local folder:
my_pipeline/
├── bronze/
│ ├── ingest_orders.sql # SQL (default for most cases)
│ └── ingest_events.py # Python (for complex logic)
├── silver/
│ └── clean_orders.sql
└── gold/
└── daily_summary.sql
SQL Example (bronze/ingest_orders.sql):
CREATE OR REFRESH STREAMING TABLE bronze_orders CLUSTER BY (order_date) AS SELECT *, current_timestamp() AS _ingested_at, _metadata.file_path AS _source_file FROM read_files( '/Volumes/catalog/schema/raw/orders/', format => 'json', schemaHints => 'order_id STRING, customer_id STRING, amount DECIMAL(10,2), order_date DATE' );
Python Example (bronze/ingest_events.py):
from pyspark import pipelines as dp
from pyspark.sql.functions import col, current_timestamp
@dp.table(name="bronze_events", cluster_by=["event_date"])
def bronze_events():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/Volumes/catalog/schema/raw/events/")
.withColumn("_ingested_at", current_timestamp())
.withColumn("_source_file", col("_metadata.file_path"))
)
Language Selection:
- •Default to SQL unless user specifies Python or task requires it
- •Use SQL for: Transformations, aggregations, filtering, joins (90% of use cases)
- •Use Python for: Complex UDFs, external APIs, ML inference, dynamic paths
- •Generate ONE language per request unless user explicitly asks for mixed pipeline
Step 2: Upload to Databricks Workspace
# MCP Tool: upload_folder
upload_folder(
local_folder="/path/to/my_pipeline",
workspace_folder="/Workspace/Users/user@example.com/my_pipeline"
)
Step 3: Create/Update and Run Pipeline
Use create_or_update_pipeline - the main entry point. It:
- •Searches for an existing pipeline with the same name (or uses
idfromextra_settings) - •Creates a new pipeline or updates the existing one
- •Optionally starts a pipeline run
- •Optionally waits for completion and returns detailed results
# MCP Tool: create_or_update_pipeline
result = create_or_update_pipeline(
name="my_orders_pipeline",
root_path="/Workspace/Users/user@example.com/my_pipeline",
catalog="my_catalog",
schema="my_schema",
workspace_file_paths=[
"/Workspace/Users/user@example.com/my_pipeline/bronze/ingest_orders.sql",
"/Workspace/Users/user@example.com/my_pipeline/silver/clean_orders.sql",
"/Workspace/Users/user@example.com/my_pipeline/gold/daily_summary.sql"
],
start_run=True, # Start immediately
wait_for_completion=True, # Wait and return final status
full_refresh=True, # Full refresh all tables
timeout=1800 # 30 minute timeout
)
Result contains actionable information:
{
"success": True, # Did the operation succeed?
"pipeline_id": "abc-123", # Pipeline ID for follow-up operations
"pipeline_name": "my_orders_pipeline",
"created": True, # True if new, False if updated
"state": "COMPLETED", # COMPLETED, FAILED, TIMEOUT, etc.
"catalog": "my_catalog", # Target catalog
"schema": "my_schema", # Target schema
"duration_seconds": 45.2, # Time taken
"message": "Pipeline created and completed successfully in 45.2s. Tables written to my_catalog.my_schema",
"error_message": None, # Error summary if failed
"errors": [] # Detailed error list if failed
}
Step 4: Handle Results
On Success:
if result["success"]:
# Verify output tables
stats = get_table_details(
catalog="my_catalog",
schema="my_schema",
table_names=["bronze_orders", "silver_orders", "gold_daily_summary"]
)
On Failure:
if not result["success"]:
# Message includes suggested next steps
print(result["message"])
# "Pipeline created but run failed. State: FAILED. Error: Column 'amount' not found.
# Use get_pipeline_events(pipeline_id='abc-123') for full details."
# Get detailed errors
events = get_pipeline_events(pipeline_id=result["pipeline_id"], max_results=50)
Step 5: Iterate Until Working
- •Review errors from result or
get_pipeline_events - •Fix issues in local files
- •Re-upload with
upload_folder - •Run
create_or_update_pipelineagain (it will update, not recreate) - •Repeat until
result["success"] == True
Quick Reference: MCP Tools
Primary Tool
| Tool | Description |
|---|---|
create_or_update_pipeline | Main entry point. Creates or updates pipeline, optionally runs and waits. Returns detailed status with success, state, errors, and actionable message. |
Pipeline Management
| Tool | Description |
|---|---|
find_pipeline_by_name | Find existing pipeline by name, returns pipeline_id |
get_pipeline | Get pipeline configuration and current state |
start_update | Start pipeline run (validate_only=True for dry run) |
get_update | Poll update status (QUEUED, RUNNING, COMPLETED, FAILED) |
stop_pipeline | Stop a running pipeline |
get_pipeline_events | Get error messages for debugging failed runs |
delete_pipeline | Delete a pipeline |
Supporting Tools
| Tool | Description |
|---|---|
upload_folder | Upload local folder to workspace (parallel) |
get_table_details | Verify output tables have expected schema and row counts |
execute_sql | Run ad-hoc SQL to inspect data |
Reference Documentation (Local)
Load these for detailed patterns:
- •1-ingestion-patterns.md - Auto Loader, Kafka, Event Hub, Kinesis, file formats
- •2-streaming-patterns.md - Deduplication, windowing, stateful operations, joins
- •3-scd-patterns.md - Querying SCD Type 2 history tables, temporal joins
- •4-performance-tuning.md - Liquid Clustering, optimization, state management
- •5-python-api.md - Modern
dpAPI vs legacydltAPI comparison - •6-dlt-migration.md - Migrating existing DLT pipelines to SDP
- •7-advanced-configuration.md -
extra_settingsparameter reference and examples
Best Practices (2025)
Language Selection
- •Default to SQL unless user specifies Python or task clearly requires it
- •Use SQL for: Transformations, aggregations, filtering, joins (most cases)
- •Use Python for: Complex UDFs, external APIs, ML inference, dynamic paths (use modern
pyspark.pipelines as dp) - •Generate ONE language per request unless user explicitly asks for mixed pipeline
Modern Defaults
- •CLUSTER BY (Liquid Clustering), not PARTITION BY - see 4-performance-tuning.md
- •Raw
.sql/.pyfiles, not notebooks - •Serverless compute ONLY - Do not use classic clusters unless explicitly required
- •Unity Catalog (required for serverless)
- •read_files() for cloud storage ingestion - see 1-ingestion-patterns.md
Common Issues
| Issue | Solution |
|---|---|
| Empty output tables | Use get_table_details to verify, check upstream sources |
| Pipeline stuck INITIALIZING | Normal for serverless, wait a few minutes |
| "Column not found" | Check schemaHints match actual data |
| Streaming reads fail | Use FROM STREAM(table) for streaming sources |
| Timeout during run | Increase timeout, or use wait_for_completion=False and poll with get_update |
| MV doesn't refresh | Enable row tracking on source tables |
| SCD2 schema errors | Let SDP infer START_AT/END_AT columns |
For detailed errors, the result["message"] from create_or_update_pipeline includes suggested next steps. Use get_pipeline_events(pipeline_id=...) for full stack traces.
Advanced Pipeline Configuration
For advanced configuration options (development mode, continuous pipelines, custom clusters, notifications, Python dependencies, etc.), see 7-advanced-configuration.md.
Platform Constraints
Serverless Pipeline Requirements (Default)
| Requirement | Details |
|---|---|
| Unity Catalog | Required - serverless pipelines always use UC |
| Workspace Region | Must be in serverless-enabled region |
| Serverless Terms | Must accept serverless terms of use |
| CDC Features | Requires serverless (or Pro/Advanced with classic clusters) |
Serverless Limitations (When Classic Clusters Required)
| Limitation | Workaround |
|---|---|
| R language | Not supported - use classic clusters if required |
| Spark RDD APIs | Not supported - use classic clusters if required |
| JAR libraries | Not supported - use classic clusters if required |
| Maven coordinates | Not supported - use classic clusters if required |
| DBFS root access | Limited - must use Unity Catalog external locations |
| Global temp views | Not supported |
General Constraints
| Constraint | Details |
|---|---|
| Schema Evolution | Streaming tables require full refresh for incompatible changes |
| SQL Limitations | PIVOT clause unsupported |
| Sinks | Python only, streaming only, append flows only |
Default to serverless unless user explicitly requires R, RDD APIs, or JAR libraries.