AgentSkillsCN

spark-declarative-pipelines

使用无服务器计算来创建、配置并更新 Databricks Lakeflow Spark 声明式管道(SDP/LDP)。支持流式表、物化视图、CDC、SCD Type 2 以及 Auto Loader 数据摄取模式。适用于构建数据管道、处理 Delta Live Tables、摄取流式数据、实施变更数据捕获,或当用户提及 SDP、LDP、DLT、Lakeflow 管道、流式表,或青铜/白银/黄金层级架构时使用。

SKILL.md
--- frontmatter
name: spark-declarative-pipelines
description: "Creates, configures, and updates Databricks Lakeflow Spark Declarative Pipelines (SDP/LDP) using serverless compute. Handles streaming tables, materialized views, CDC, SCD Type 2, and Auto Loader ingestion patterns. Use when building data pipelines, working with Delta Live Tables, ingesting streaming data, implementing change data capture, or when the user mentions SDP, LDP, DLT, Lakeflow pipelines, streaming tables, or bronze/silver/gold medallion architectures."

Lakeflow Spark Declarative Pipelines (SDP)

Official Documentation


Quick Start: Initialize New Pipeline Project

RECOMMENDED: Use databricks pipelines init to create production-ready Asset Bundle projects with multi-environment support.

When to Use Bundle Initialization

Use bundle initialization for New pipeline projects for a professional structure from the start

Use manual workflow for:

  • Quick prototyping without multi-environment needs
  • Existing manual projects you want to continue
  • Learning/experimentation

Step 1: Initialize Project

I will automatically run this command when you request a new pipeline:

bash
databricks pipelines init --output-dir ./my_pipeline

Interactive Prompts:

  • Project name: e.g., customer_orders_pipeline
  • Initial catalog: Unity Catalog name (e.g., main, prod_catalog)
  • Personal schema per user?: yes for dev (each user gets their own schema), no for prod
  • Language: SQL or Python (auto-detected from your request - see language detection below)

Generated Structure:

code
my_pipeline/
├── databricks.yml              # Multi-environment config (dev/prod)
├── resources/
│   └── *_etl.pipeline.yml      # Pipeline resource definition
└── src/
    └── *_etl/
        ├── explorations/       # Exploratory code in .ipynb
        └── transformations/    # Your .sql or .py files here

Step 2: Customize Transformations

Replace the example code created by the init process with custom transformation files in src/transformations/ based on provided requirements, using best practice guidance from this skill.

Step 3: Deploy and Run

bash
# Deploy to workspace (dev by default)
databricks bundle deploy

# Run pipeline
databricks bundle run my_pipeline_etl

# Deploy to production
databricks bundle deploy --target prod

I can run these commands for you using the Bash tool.

For medallion architecture (bronze/silver/gold), two approaches work:

  • Flat with naming (template default): bronze_*.sql, silver_*.sql, gold_*.sql
  • Subdirectories: bronze/orders.sql, silver/cleaned.sql, gold/summary.sql

Both work with the transformations/** glob pattern. Choose based on preference.

See 8-project-initialization.md for complete details on bundle initialization, migration, and troubleshooting.


Alternative: Manual Workflow (Advanced)

For rapid prototyping, experimentation, or when you prefer direct control without Asset Bundles, use the manual workflow with MCP tools.

Use MCP tools to create, run, and iterate on serverless SDP pipelines. The primary tool is create_or_update_pipeline which handles the entire lifecycle.

IMPORTANT: Always create serverless pipelines (default). Only use classic clusters if user explicitly requires R language, Spark RDD APIs, or JAR libraries.

Step 1: Write Pipeline Files Locally

Create .sql or .py files in a local folder:

code
my_pipeline/
├── bronze/
│   ├── ingest_orders.sql       # SQL (default for most cases)
│   └── ingest_events.py        # Python (for complex logic)
├── silver/
│   └── clean_orders.sql
└── gold/
    └── daily_summary.sql

SQL Example (bronze/ingest_orders.sql):

sql
CREATE OR REFRESH STREAMING TABLE bronze_orders
CLUSTER BY (order_date)
AS
SELECT
  *,
  current_timestamp() AS _ingested_at,
  _metadata.file_path AS _source_file
FROM read_files(
  '/Volumes/catalog/schema/raw/orders/',
  format => 'json',
  schemaHints => 'order_id STRING, customer_id STRING, amount DECIMAL(10,2), order_date DATE'
);

Python Example (bronze/ingest_events.py):

python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, current_timestamp

@dp.table(name="bronze_events", cluster_by=["event_date"])
def bronze_events():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .load("/Volumes/catalog/schema/raw/events/")
        .withColumn("_ingested_at", current_timestamp())
        .withColumn("_source_file", col("_metadata.file_path"))
    )

Language Selection:

  • Auto-detection: I analyze your request for keywords:
    • SQL indicators: "SQL", "sql files", "simple transformations", "aggregations", "materialized view", "CREATE OR REFRESH"
    • Python indicators: "Python", ".py files", "UDF", "complex logic", "ML inference", "external API", "@dp.table", "pandas"
  • Prompt for clarification when language intent is unclear or mixed
  • Use SQL for: Transformations, aggregations, filtering, joins (most cases)
  • Generate ONE language per request unless you explicitly ask for mixed pipeline

See 8-project-initialization.md for detailed language detection logic.

Step 2: Upload to Databricks Workspace

python
# MCP Tool: upload_folder
upload_folder(
    local_folder="/path/to/my_pipeline",
    workspace_folder="/Workspace/Users/user@example.com/my_pipeline"
)

Step 3: Create/Update and Run Pipeline

Use create_or_update_pipeline - the main entry point. It:

  1. Searches for an existing pipeline with the same name (or uses id from extra_settings)
  2. Creates a new pipeline or updates the existing one
  3. Optionally starts a pipeline run
  4. Optionally waits for completion and returns detailed results
python
# MCP Tool: create_or_update_pipeline
result = create_or_update_pipeline(
    name="my_orders_pipeline",
    root_path="/Workspace/Users/user@example.com/my_pipeline",
    catalog="my_catalog",
    schema="my_schema",
    workspace_file_paths=[
        "/Workspace/Users/user@example.com/my_pipeline/bronze/ingest_orders.sql",
        "/Workspace/Users/user@example.com/my_pipeline/silver/clean_orders.sql",
        "/Workspace/Users/user@example.com/my_pipeline/gold/daily_summary.sql"
    ],
    start_run=True,           # Start immediately
    wait_for_completion=True, # Wait and return final status
    full_refresh=True,        # Full refresh all tables
    timeout=1800              # 30 minute timeout
)

Result contains actionable information:

python
{
    "success": True,                    # Did the operation succeed?
    "pipeline_id": "abc-123",           # Pipeline ID for follow-up operations
    "pipeline_name": "my_orders_pipeline",
    "created": True,                    # True if new, False if updated
    "state": "COMPLETED",               # COMPLETED, FAILED, TIMEOUT, etc.
    "catalog": "my_catalog",            # Target catalog
    "schema": "my_schema",              # Target schema
    "duration_seconds": 45.2,           # Time taken
    "message": "Pipeline created and completed successfully in 45.2s. Tables written to my_catalog.my_schema",
    "error_message": None,              # Error summary if failed
    "errors": []                        # Detailed error list if failed
}

Step 4: Handle Results

On Success:

python
if result["success"]:
    # Verify output tables
    stats = get_table_details(
        catalog="my_catalog",
        schema="my_schema",
        table_names=["bronze_orders", "silver_orders", "gold_daily_summary"]
    )

On Failure:

python
if not result["success"]:
    # Message includes suggested next steps
    print(result["message"])
    # "Pipeline created but run failed. State: FAILED. Error: Column 'amount' not found.
    #  Use get_pipeline_events(pipeline_id='abc-123') for full details."

    # Get detailed errors
    events = get_pipeline_events(pipeline_id=result["pipeline_id"], max_results=50)

Step 5: Iterate Until Working

  1. Review errors from result or get_pipeline_events
  2. Fix issues in local files
  3. Re-upload with upload_folder
  4. Run create_or_update_pipeline again (it will update, not recreate)
  5. Repeat until result["success"] == True

Quick Reference: MCP Tools

Primary Tool

ToolDescription
create_or_update_pipelineMain entry point. Creates or updates pipeline, optionally runs and waits. Returns detailed status with success, state, errors, and actionable message.

Pipeline Management

ToolDescription
find_pipeline_by_nameFind existing pipeline by name, returns pipeline_id
get_pipelineGet pipeline configuration and current state
start_updateStart pipeline run (validate_only=True for dry run)
get_updatePoll update status (QUEUED, RUNNING, COMPLETED, FAILED)
stop_pipelineStop a running pipeline
get_pipeline_eventsGet error messages for debugging failed runs
delete_pipelineDelete a pipeline

Supporting Tools

ToolDescription
upload_folderUpload local folder to workspace (parallel)
get_table_detailsVerify output tables have expected schema and row counts
execute_sqlRun ad-hoc SQL to inspect data

Reference Documentation (Local)

Load these for detailed patterns:


Best Practices (2025)

Project Structure

  • Default to databricks pipelines init for new projects (creates Asset Bundle)
  • Use Asset Bundles for multi-environment deployments (dev/staging/prod)
  • Manual structure only for quick prototypes or legacy migration
  • Medallion architecture: Two approaches work with Asset Bundles:
    • Flat structure (template default): bronze_*.sql, silver_*.sql, gold_*.sql in transformations/
    • Subdirectories: transformations/bronze/, transformations/silver/, transformations/gold/
    • Both work with the transformations/** glob pattern - choose based on team preference
  • See 8-project-initialization.md for project setup details

Language Selection

  • Auto-detect from user prompt - analyze keywords to infer SQL vs Python
  • Default to SQL unless user specifies Python or task clearly requires it
  • Use SQL for: Transformations, aggregations, filtering, joins (most cases)
  • Use Python for: Complex UDFs, external APIs, ML inference, dynamic paths (use modern pyspark.pipelines as dp)
  • Generate ONE language per request unless user explicitly asks for mixed pipeline

Modern Defaults

  • CLUSTER BY (Liquid Clustering), not PARTITION BY - see 4-performance-tuning.md
  • Raw .sql/.py files, not notebooks
  • Serverless compute ONLY - Do not use classic clusters unless explicitly required
  • Unity Catalog (required for serverless)
  • read_files() for cloud storage ingestion - see 1-ingestion-patterns.md

Common Issues

IssueSolution
Empty output tablesUse get_table_details to verify, check upstream sources
Pipeline stuck INITIALIZINGNormal for serverless, wait a few minutes
"Column not found"Check schemaHints match actual data
Streaming reads failUse FROM STREAM(table) for streaming sources
Timeout during runIncrease timeout, or use wait_for_completion=False and poll with get_update
MV doesn't refreshEnable row tracking on source tables
SCD2 schema errorsLet SDP infer START_AT/END_AT columns

For detailed errors, the result["message"] from create_or_update_pipeline includes suggested next steps. Use get_pipeline_events(pipeline_id=...) for full stack traces.


Advanced Pipeline Configuration

For advanced configuration options (development mode, continuous pipelines, custom clusters, notifications, Python dependencies, etc.), see 7-advanced-configuration.md.


Platform Constraints

Serverless Pipeline Requirements (Default)

RequirementDetails
Unity CatalogRequired - serverless pipelines always use UC
Workspace RegionMust be in serverless-enabled region
Serverless TermsMust accept serverless terms of use
CDC FeaturesRequires serverless (or Pro/Advanced with classic clusters)

Serverless Limitations (When Classic Clusters Required)

LimitationWorkaround
R languageNot supported - use classic clusters if required
Spark RDD APIsNot supported - use classic clusters if required
JAR librariesNot supported - use classic clusters if required
Maven coordinatesNot supported - use classic clusters if required
DBFS root accessLimited - must use Unity Catalog external locations
Global temp viewsNot supported

General Constraints

ConstraintDetails
Schema EvolutionStreaming tables require full refresh for incompatible changes
SQL LimitationsPIVOT clause unsupported
SinksPython only, streaming only, append flows only

Default to serverless unless user explicitly requires R, RDD APIs, or JAR libraries.