AgentSkillsCN

spark-declarative-pipelines

利用无服务器计算,创建、配置、更新、审核并排查Databricks Lakeflow Spark声明式管道(SDP/LDP)。支持流式表、物化视图、CDC、SCD Type 2,以及Auto Loader数据摄取功能。当您需要:(1) 构建全新的数据管道;(2) 审核或审计现有管道以确保符合最佳实践;(3) 将旧版DLT Python迁移至现代SDP;(4) 排查管道故障时,可使用此技能。触发短语包括:SDP、LDP、DLT、Delta Live Tables、Lakeflow、流式表、物化视图、APPLY CHANGES、CDC、青铜/白银/黄金层级、奖章式架构、@dlt.table、@dp.table、read_files、cloudFiles、管道审核、管道审计、管道最佳实践。

SKILL.md
--- frontmatter
name: spark-declarative-pipelines
description: "Creates, configures, updates, reviews, and troubleshoots Databricks Lakeflow Spark Declarative Pipelines (SDP/LDP) using serverless compute. Handles streaming tables, materialized views, CDC, SCD Type 2, and Auto Loader ingestion. Use when: (1) building new data pipelines, (2) reviewing or auditing existing pipelines for best practices, (3) migrating legacy DLT Python to modern SDP, (4) troubleshooting pipeline failures. Triggers: SDP, LDP, DLT, Delta Live Tables, Lakeflow, streaming tables, materialized views, APPLY CHANGES, CDC, bronze/silver/gold, medallion architecture, @dlt.table, @dp.table, read_files, cloudFiles, pipeline review, pipeline audit, pipeline best practices."

Lakeflow Spark Declarative Pipelines (SDP)

Official Documentation


Workflow Selection

Determine the task type and follow the appropriate workflow:

TaskWorkflow
Build new pipelineFollow "Development Workflow with MCP Tools" below
Review/audit existing pipelinesFollow "Review Mode" below
Migrate legacy DLT to SDPSee 6-dlt-migration.md
Troubleshoot pipeline failuresUse get_pipeline_events(), see "Common Issues"

Review Mode

When asked to review, audit, check, or improve existing pipelines:

Step 1: Locate Pipeline Definitions

Search for pipeline configurations:

  • DAB pipelines: **/dlt_pipelines/*.yml or resources/pipelines in databricks.yml
  • Source files: .sql and .py files referenced in pipeline libraries

Step 2: Check Against 2025 Best Practices

CheckLegacy PatternModern PatternReference
Python APIimport dltfrom pyspark import pipelines as dp5-python-api.md
Ingestionspark.readStream.format("cloudFiles")read_files() SQL syntax1-ingestion-patterns.md
ClusteringPARTITION BY or noneCLUSTER BY (Liquid Clustering)4-performance-tuning.md
File formatNotebook (# Databricks notebook source)Raw .sql/.py filesBest practice
SCD patternsdlt.apply_changes()AUTO CDC INTO SQL syntax3-scd-patterns.md

Step 3: Review Checklist

  • Modern API: Using pyspark.pipelines (dp) not legacy dlt
  • SQL preferred: Transformations in SQL unless Python required
  • Liquid Clustering: CLUSTER BY on all tables, especially facts
  • Data quality: Expectations defined (EXPECT, @dp.expect_or_drop)
  • No hardcoded configs: Variables from pipeline configuration, not SET statements
  • Raw files: Not notebook format with # COMMAND ----------
  • Serverless: No classic cluster configuration unless required

Step 4: Provide Recommendations

For each issue found:

  1. Identify the file and line number
  2. Show the current (legacy) code
  3. Show the recommended (modern) code
  4. Reference the appropriate documentation file

Development Workflow with MCP Tools

Use MCP tools to create, run, and iterate on serverless SDP pipelines. The primary tool is create_or_update_pipeline which handles the entire lifecycle.

IMPORTANT: Always create serverless pipelines (default). Only use classic clusters if user explicitly requires R language, Spark RDD APIs, or JAR libraries.

Step 1: Write Pipeline Files Locally

Create .sql or .py files in a local folder:

code
my_pipeline/
├── bronze/
│   ├── ingest_orders.sql       # SQL (default for most cases)
│   └── ingest_events.py        # Python (for complex logic)
├── silver/
│   └── clean_orders.sql
└── gold/
    └── daily_summary.sql

SQL Example (bronze/ingest_orders.sql):

sql
CREATE OR REFRESH STREAMING TABLE bronze_orders
CLUSTER BY (order_date)
AS
SELECT
  *,
  current_timestamp() AS _ingested_at,
  _metadata.file_path AS _source_file
FROM read_files(
  '/Volumes/catalog/schema/raw/orders/',
  format => 'json',
  schemaHints => 'order_id STRING, customer_id STRING, amount DECIMAL(10,2), order_date DATE'
);

Python Example (bronze/ingest_events.py):

python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, current_timestamp

@dp.table(name="bronze_events", cluster_by=["event_date"])
def bronze_events():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .load("/Volumes/catalog/schema/raw/events/")
        .withColumn("_ingested_at", current_timestamp())
        .withColumn("_source_file", col("_metadata.file_path"))
    )

Language Selection:

  • Default to SQL unless user specifies Python or task requires it
  • Use SQL for: Transformations, aggregations, filtering, joins (90% of use cases)
  • Use Python for: Complex UDFs, external APIs, ML inference, dynamic paths
  • Generate ONE language per request unless user explicitly asks for mixed pipeline

Step 2: Upload to Databricks Workspace

python
# MCP Tool: upload_folder
upload_folder(
    local_folder="/path/to/my_pipeline",
    workspace_folder="/Workspace/Users/user@example.com/my_pipeline"
)

Step 3: Create/Update and Run Pipeline

Use create_or_update_pipeline - the main entry point. It:

  1. Searches for an existing pipeline with the same name (or uses id from extra_settings)
  2. Creates a new pipeline or updates the existing one
  3. Optionally starts a pipeline run
  4. Optionally waits for completion and returns detailed results
python
# MCP Tool: create_or_update_pipeline
result = create_or_update_pipeline(
    name="my_orders_pipeline",
    root_path="/Workspace/Users/user@example.com/my_pipeline",
    catalog="my_catalog",
    schema="my_schema",
    workspace_file_paths=[
        "/Workspace/Users/user@example.com/my_pipeline/bronze/ingest_orders.sql",
        "/Workspace/Users/user@example.com/my_pipeline/silver/clean_orders.sql",
        "/Workspace/Users/user@example.com/my_pipeline/gold/daily_summary.sql"
    ],
    start_run=True,           # Start immediately
    wait_for_completion=True, # Wait and return final status
    full_refresh=True,        # Full refresh all tables
    timeout=1800              # 30 minute timeout
)

Result contains actionable information:

python
{
    "success": True,                    # Did the operation succeed?
    "pipeline_id": "abc-123",           # Pipeline ID for follow-up operations
    "pipeline_name": "my_orders_pipeline",
    "created": True,                    # True if new, False if updated
    "state": "COMPLETED",               # COMPLETED, FAILED, TIMEOUT, etc.
    "catalog": "my_catalog",            # Target catalog
    "schema": "my_schema",              # Target schema
    "duration_seconds": 45.2,           # Time taken
    "message": "Pipeline created and completed successfully in 45.2s. Tables written to my_catalog.my_schema",
    "error_message": None,              # Error summary if failed
    "errors": []                        # Detailed error list if failed
}

Step 4: Handle Results

On Success:

python
if result["success"]:
    # Verify output tables
    stats = get_table_details(
        catalog="my_catalog",
        schema="my_schema",
        table_names=["bronze_orders", "silver_orders", "gold_daily_summary"]
    )

On Failure:

python
if not result["success"]:
    # Message includes suggested next steps
    print(result["message"])
    # "Pipeline created but run failed. State: FAILED. Error: Column 'amount' not found.
    #  Use get_pipeline_events(pipeline_id='abc-123') for full details."

    # Get detailed errors
    events = get_pipeline_events(pipeline_id=result["pipeline_id"], max_results=50)

Step 5: Iterate Until Working

  1. Review errors from result or get_pipeline_events
  2. Fix issues in local files
  3. Re-upload with upload_folder
  4. Run create_or_update_pipeline again (it will update, not recreate)
  5. Repeat until result["success"] == True

Quick Reference: MCP Tools

Primary Tool

ToolDescription
create_or_update_pipelineMain entry point. Creates or updates pipeline, optionally runs and waits. Returns detailed status with success, state, errors, and actionable message.

Pipeline Management

ToolDescription
find_pipeline_by_nameFind existing pipeline by name, returns pipeline_id
get_pipelineGet pipeline configuration and current state
start_updateStart pipeline run (validate_only=True for dry run)
get_updatePoll update status (QUEUED, RUNNING, COMPLETED, FAILED)
stop_pipelineStop a running pipeline
get_pipeline_eventsGet error messages for debugging failed runs
delete_pipelineDelete a pipeline

Supporting Tools

ToolDescription
upload_folderUpload local folder to workspace (parallel)
get_table_detailsVerify output tables have expected schema and row counts
execute_sqlRun ad-hoc SQL to inspect data

Reference Documentation (Local)

Load these for detailed patterns:


Best Practices (2025)

Language Selection

  • Default to SQL unless user specifies Python or task clearly requires it
  • Use SQL for: Transformations, aggregations, filtering, joins (most cases)
  • Use Python for: Complex UDFs, external APIs, ML inference, dynamic paths (use modern pyspark.pipelines as dp)
  • Generate ONE language per request unless user explicitly asks for mixed pipeline

Modern Defaults

  • CLUSTER BY (Liquid Clustering), not PARTITION BY - see 4-performance-tuning.md
  • Raw .sql/.py files, not notebooks
  • Serverless compute ONLY - Do not use classic clusters unless explicitly required
  • Unity Catalog (required for serverless)
  • read_files() for cloud storage ingestion - see 1-ingestion-patterns.md

Common Issues

IssueSolution
Empty output tablesUse get_table_details to verify, check upstream sources
Pipeline stuck INITIALIZINGNormal for serverless, wait a few minutes
"Column not found"Check schemaHints match actual data
Streaming reads failUse FROM STREAM(table) for streaming sources
Timeout during runIncrease timeout, or use wait_for_completion=False and poll with get_update
MV doesn't refreshEnable row tracking on source tables
SCD2 schema errorsLet SDP infer START_AT/END_AT columns

For detailed errors, the result["message"] from create_or_update_pipeline includes suggested next steps. Use get_pipeline_events(pipeline_id=...) for full stack traces.


Advanced Pipeline Configuration

For advanced configuration options (development mode, continuous pipelines, custom clusters, notifications, Python dependencies, etc.), see 7-advanced-configuration.md.


Platform Constraints

Serverless Pipeline Requirements (Default)

RequirementDetails
Unity CatalogRequired - serverless pipelines always use UC
Workspace RegionMust be in serverless-enabled region
Serverless TermsMust accept serverless terms of use
CDC FeaturesRequires serverless (or Pro/Advanced with classic clusters)

Serverless Limitations (When Classic Clusters Required)

LimitationWorkaround
R languageNot supported - use classic clusters if required
Spark RDD APIsNot supported - use classic clusters if required
JAR librariesNot supported - use classic clusters if required
Maven coordinatesNot supported - use classic clusters if required
DBFS root accessLimited - must use Unity Catalog external locations
Global temp viewsNot supported

General Constraints

ConstraintDetails
Schema EvolutionStreaming tables require full refresh for incompatible changes
SQL LimitationsPIVOT clause unsupported
SinksPython only, streaming only, append flows only

Default to serverless unless user explicitly requires R, RDD APIs, or JAR libraries.