AgentSkillsCN

ml-pipeline-setup

为Databricks观测性设置提供端到端的编排器,涵盖湖仓监控、AI/BI仪表板,以及SQL告警。引导用户完成黄金表的监控创建、以监控小部件设计仪表板,并通过配置驱动告警。编排器统筹了监测技能的必要依赖(lakehouse-monitoring-comprehensive、databricks-aibi-dashboards、sql-alerting-patterns),以及常用技能(databricks-asset-bundles、databricks-expert-agent、databricks-python-imports)。在端到端设置观测性、创建湖仓监控、构建仪表板,或配置SQL告警时使用此功能。

SKILL.md
--- frontmatter
name: ml-pipeline-setup
description: MLflow and ML Model patterns for Databricks including experiment creation, model training, batch inference, and Unity Catalog integration. Use when implementing ML pipelines, training models with Feature Store, or deploying batch inference jobs. Includes 19 non-negotiable rules covering experiment paths, dataset logging, UC model registration, NaN handling, label binarization, feature engineering workflows, and signature-driven preprocessing.
metadata:
  author: prashanth subrahmanyam
  version: "2.0"
  domain: ml
  role: orchestrator
  pipeline_stage: 8
  pipeline_stage_name: ml
  next_stages:
    - genai-agents-setup
  workers: []
  common_dependencies:
    - databricks-asset-bundles
    - databricks-expert-agent
    - databricks-python-imports
    - naming-tagging-standards
    - databricks-autonomous-operations
  consumes:
    - plans/manifests/ml-manifest.yaml
  consumes_fallback: "Gold table inventory (self-discovery from catalog)"
  last_verified: "2026-02-07"
  volatility: high
  upstream_sources:
    - name: "ai-dev-kit"
      repo: "databricks-solutions/ai-dev-kit"
      paths:
        - "databricks-skills/model-serving/SKILL.md"
        - "databricks-skills/vector-search/SKILL.md"
      relationship: "extended"
      last_synced: "2026-02-09"
      sync_commit: "97a3637"

MLflow & ML Models Patterns

Phase 0: Read Plan (5 minutes)

Before starting implementation, check for a planning manifest that defines what to build.

python
import yaml
from pathlib import Path

manifest_path = Path("plans/manifests/ml-manifest.yaml")

if manifest_path.exists():
    with open(manifest_path) as f:
        manifest = yaml.safe_load(f)
    
    # Extract implementation checklist from manifest
    feature_tables = manifest.get('feature_tables', [])
    models = manifest.get('models', [])
    experiments = manifest.get('experiments', [])
    print(f"Plan: {len(feature_tables)} feature tables, {len(models)} models, {len(experiments)} experiments")
    
    # Each model has: name, domain, model_type, algorithm, feature_table,
    #                  label_column, label_type, business_questions
    # Each feature table has: name, primary_keys, source_gold_tables, features
else:
    # Fallback: self-discovery from Gold tables
    print("No manifest found — falling back to Gold table self-discovery")
    # Discover Gold fact tables, infer feature columns, create one model per domain

If manifest exists: Use it as the implementation checklist. Every feature table, model, and experiment is pre-defined with configuration details. Track completion against the manifest's summary counts.

If manifest doesn't exist: Fall back to self-discovery — inventory Gold fact tables, infer feature columns from numeric columns, and create one model per domain. This works but may miss specific label derivations and business context the planning phase would have defined.


Quick Start (4-6 hours)

Goal: Build production-ready ML pipelines with MLflow 3.1+, Unity Catalog Model Registry, and Databricks Feature Engineering for training-serving consistency.

What You'll Create:

  1. features/create_feature_tables.py - Feature tables in Unity Catalog
  2. {domain}/train_{model_name}.py - Training pipelines with Feature Engineering
  3. inference/batch_inference_all_models.py - Batch scoring with fe.score_batch
  4. Asset Bundle jobs for orchestration

Fast Track:

bash
# 1. Create Feature Tables
databricks bundle run ml_feature_pipeline_job -t dev

# 2. Train all models (parallel)
databricks bundle run ml_training_pipeline_job -t dev

# 3. Run batch inference
databricks bundle run ml_inference_pipeline_job -t dev

Overview

Production-grade patterns for implementing ML pipelines on Databricks using MLflow, Unity Catalog, and Feature Store. Based on production experience with 25 models across 5 domains, achieving 96% inference success rate and 93% reduction in debugging time.

Pattern Origin: December 2025 (Updated: February 6, 2026 - v5.0)


When to Use This Skill

Use this skill when:

  • Implementing ML pipelines on Databricks with MLflow tracking
  • Training models with Feature Store integration
  • Deploying batch inference jobs
  • Registering models to Unity Catalog
  • Troubleshooting MLflow experiment, model registration, or inference errors
  • Setting up Databricks Asset Bundle jobs for ML workflows
  • Creating feature tables in Unity Catalog with proper primary keys and NaN handling

Critical for:

  • Ensuring training and inference consistency via fe.score_batch
  • Preventing common MLflow signature errors
  • Handling data quality issues (NaN, label binarization, single-class data)
  • Configuring serverless ML jobs correctly

Architecture Overview

code
┌─────────────────────────────────────────────────────────────────┐
│                          Gold Layer                              │
│   (fact_tables, dim_tables - source for feature engineering)    │
└───────────────────────┬─────────────────────────────────────────┘
                        │
                        ▼
┌─────────────────────────────────────────────────────────────────┐
│              Feature Tables (Unity Catalog)                      │
│  ┌───────────────────┐ ┌──────────────────┐ ┌─────────────────┐ │
│  │ cost_features     │ │ security_features│ │ performance_    │ │
│  │ PK: workspace_id, │ │ PK: user_id,     │ │   features      │ │
│  │     usage_date    │ │     event_date   │ │ PK: warehouse_id│ │
│  └───────────────────┘ └──────────────────┘ │     query_date  │ │
│                                             └─────────────────┘ │
└───────────────────────┬─────────────────────────────────────────┘
                        │
                        ▼
┌─────────────────────────────────────────────────────────────────┐
│                    Training Pipelines                            │
│  ┌─────────────────────────────────────────────────────────────┐│
│  │  FeatureLookup → create_training_set → train → fe.log_model ││
│  │  (Embeds feature metadata for inference consistency)        ││
│  └─────────────────────────────────────────────────────────────┘│
└───────────────────────┬─────────────────────────────────────────┘
                        │
                        ▼
┌─────────────────────────────────────────────────────────────────┐
│             Unity Catalog Model Registry (MLflow 3.1+)           │
│         catalog.{feature_schema}.{model_name}                    │
│         (Model + Feature Lookup Metadata embedded)               │
└───────────────────────┬─────────────────────────────────────────┘
                        │
                        ▼
┌─────────────────────────────────────────────────────────────────┐
│                      Inference Layer                             │
│  ┌──────────────────────────────────────────────────────────┐   │
│  │  fe.score_batch(model_uri, df_with_lookup_keys_only)     │   │
│  │  → Automatically retrieves features from Feature Tables  │   │
│  │  → Guarantees training-serving consistency               │   │
│  └──────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘

Directory Structure

code
src/{project}_ml/
├── features/
│   └── create_feature_tables.py     # Feature table creation
├── cost/
│   ├── train_budget_forecaster.py
│   ├── train_cost_anomaly_detector.py
│   └── train_chargeback_attribution.py
├── security/
│   └── train_security_threat_detector.py
├── performance/
│   └── train_query_performance_forecaster.py
├── reliability/
│   └── train_job_failure_predictor.py
├── quality/
│   └── train_data_drift_detector.py
├── inference/
│   └── batch_inference_all_models.py  # Uses fe.score_batch
└── README.md

resources/ml/
├── ml_feature_pipeline_job.yml       # Feature table creation
├── ml_training_pipeline_job.yml      # Training orchestrator
└── ml_inference_pipeline_job.yml     # Batch inference

Critical Rules (Quick Reference)

#RulePatternWhy It Fails Otherwise
0Pin Package Versionsmlflow==3.7.0 (exact) in training AND inferenceVersion mismatch warnings, deserialization failures
1Experiment Path/Shared/{project}_ml_{model_name}/Users/... fails silently if subfolder doesn't exist
2Dataset LoggingInside mlflow.start_run() contextWon't associate with run, invisible in UI
3Exit Signaldbutils.notebook.exit("SUCCESS")Job status unclear, may show SUCCESS on failure
4UC Model LoggingUse fe.log_model() with output_schema (PRIMARY) or infer_signature (alternative)Unity Catalog rejects models without output spec
5Feature Engineering WorkflowFeatureLookup + create_training_set + fe.log_modelFeature skew between training and inference
6NaN Handling at SourceClean NaN/Inf at feature table creation with clean_numeric()sklearn GradientBoosting fails at inference; XGBoost handles NaN but sklearn doesn't
7Label BinarizationConvert 0-1 rates to binary for classifiersXGBoostError: base_score must be in (0,1)
8Single-Class CheckVerify label distribution before trainingClassifier can't train on all-same labels
9Exclude LabelsUse exclude_columns=[LABEL_COLUMN] in create_training_setLabel included as feature causes inference failure
10Label Type CastingCast to INT (classification) or DOUBLE (regression) before trainingType mismatch in model output
11Double Type CastingCast ALL numeric features to DOUBLE in feature tablesMLflow signatures reject DecimalType
12Lookup Keys Match PKslookup_key MUST match Feature Table primary keys EXACTLYUnable to find feature errors
13Use fe.score_batchUse fe.score_batch NOT manual feature joins for inferenceAutomatic feature retrieval ensures training-serving consistency
14Feature RegistryQuery feature table schemas dynamicallyHardcoded feature lists drift out of sync
15Custom InferenceSeparate task for TF-IDF/NLP models that need runtime featuresfe.score_batch() can't compute runtime features
16Helper Functions InlineALWAYS inline helper functions (don't import modules)ModuleNotFoundError in serverless Asset Bundle notebooks
17Bundle Path SetupUse sys.path.insert(0, _bundle_root) patternModule imports fail in serverless
18Standardized TemplatesCopy-and-customize from skill templates (don't roll custom)Custom implementations miss edge cases

Core Patterns (Quick Examples)

Experiment Setup

python
# ✅ CORRECT: Always use /Shared/ path
experiment_name = f"/Shared/{project}_ml_{model_name}"
mlflow.set_experiment(experiment_name)

See: Experiment Patterns for full details

Model Registration with Feature Store

python
from databricks.feature_engineering import FeatureEngineeringClient
from mlflow.types import ColSpec, DataType, Schema

fe = FeatureEngineeringClient()
mlflow.set_registry_uri("databricks-uc")

# Regression: output_schema = Schema([ColSpec(DataType.double)])
# Classification: output_schema = Schema([ColSpec(DataType.long)])

fe.log_model(
    model=model,
    artifact_path="model",
    flavor=mlflow.sklearn,  # REQUIRED
    training_set=training_set,
    registered_model_name=f"{catalog}.{schema}.{model_name}",
    infer_input_example=True,
    output_schema=output_schema  # REQUIRED for UC
)

See: Model Registry for full patterns by model type

Feature Table Creation with NaN Cleaning

python
# ✅ CORRECT: Clean NaN/Inf at source for sklearn compatibility
from pyspark.sql.functions import F, isnan
from pyspark.sql.types import DoubleType

def clean_numeric(col_name):
    return F.when(
        F.col(col_name).isNull() | isnan(F.col(col_name)) |
        (F.col(col_name) == float('inf')) | (F.col(col_name) == float('-inf')),
        F.lit(0.0)
    ).otherwise(F.col(col_name))

# Apply to ALL DoubleType columns
for field in df.schema.fields:
    if isinstance(field.dataType, DoubleType):
        df = df.withColumn(field.name, clean_numeric(field.name))

See: Data Quality Patterns for full patterns

Batch Inference with fe.score_batch

python
from databricks.feature_engineering import FeatureEngineeringClient

fe = FeatureEngineeringClient()

# ✅ CORRECT: scoring_df has ONLY lookup keys (features auto-retrieved)
scoring_df = spark.table(feature_table).select(*lookup_keys).distinct()

predictions_df = fe.score_batch(
    model_uri=model_uri,
    df=scoring_df  # Contains only lookup keys
)

See: Feature Engineering Workflow for full inference patterns

Asset Bundle Job Configuration

yaml
resources:
  jobs:
    ml_training_job:
      environments:
        - environment_key: default
          spec:
            environment_version: "4"
            dependencies:
              - "mlflow==3.7.0"  # Pin exact version
              - "xgboost==2.0.3"
      tasks:
        - task_key: train_model
          notebook_task:
            notebook_path: ../../src/ml/models/train.py
            base_parameters:  # NOT parameters with --flags!
              catalog: ${var.catalog}
              model_name: my_model
      # ❌ DO NOT define experiments in Asset Bundle

See: DAB Integration for full patterns


Reference Files

Detailed documentation is organized in the references/ directory:

Experiment Patterns

Complete experiment setup, tracking, metric logging, dataset logging, hyperparameter tuning. Covers /Shared/ vs /Users/ paths, run context requirements, helper function inlining, exit signals, and common errors.

Model Registry

Model registration, versioning, aliases, deployment patterns, serving endpoints. Covers Unity Catalog integration with fe.log_model(), output schema patterns by model type (regression, classification, anomaly detection), signature-driven preprocessing, and model loading from UC. Documents both output_schema (primary) and infer_signature (alternative) approaches.

DAB Integration

Asset Bundle integration, notebook patterns, inline helpers, parameter passing. Covers training and inference job configuration, package version pinning, base_parameters vs argparse, serverless environment setup, and common deployment errors.

Feature Store Patterns

Feature table creation, feature lookup configuration, column conflict resolution, Feature Registry pattern for dynamic schema querying. Covers training set creation, feature lookups, and inference patterns.

Data Quality Patterns

NaN/Inf handling at feature table source, label binarization for XGBoost classifiers, single-class data detection, feature column exclusion. Covers sklearn vs XGBoost compatibility, preprocessing requirements, and training/inference checklists.

Troubleshooting

Comprehensive error reference table, schema verification patterns, SCD2 vs regular dimension table handling, pre-development checklist. Covers common MLflow errors, signature issues, and debugging workflows.

Requirements Template

Fill-in-the-blank requirements template for ML projects. Includes project context (catalog, schemas), feature table inventory (primary keys, features, source tables), model inventory (type, algorithm, label column, label type), and label type reference (regression vs classification vs anomaly detection casting).

Feature Engineering Workflow

End-to-end Feature Engineering workflow with FeatureLookup, create_training_set, fe.log_model, and fe.score_batch. Covers feature table creation with NaN cleaning, training set creation with proper base_df (ONLY keys + label), model logging with embedded feature metadata, and batch inference with automatic feature retrieval.


Scripts

setup_experiment.py

Utility functions for MLflow experiment setup. CRITICAL: These functions should be INLINED in each training notebook, not imported.

Functions:

  • setup_mlflow_experiment(model_name) - Set up experiment with /Shared/ path
  • log_training_dataset(spark, catalog, schema, table_name) - Log dataset inside run context
  • get_run_name(model_name, algorithm, version) - Generate descriptive run names
  • get_standard_tags(...) - Get standard MLflow tags
  • get_parameters() - Get job parameters from dbutils widgets (returns 4: catalog, gold_schema, feature_schema, model_name)

Usage:

python
# Copy these functions into your notebook (don't import)
# See scripts/setup_experiment.py for full implementation

create_feature_tables_template.py

Complete feature table creation template with NaN/Inf cleaning at source, DOUBLE type casting, PK NULL filtering, rolling window aggregations, and proper fe.create_table() calls. Uses 3-parameter get_parameters() (catalog, gold_schema, feature_schema).

Key functions:

  • get_parameters() - Returns (catalog, gold_schema, feature_schema) — no model_name needed
  • create_feature_table(spark, fe, features_df, ...) - Create with NaN cleaning + PK filtering
  • compute_{domain}_features(spark, catalog, gold_schema) - Domain-specific feature engineering
  • main() - Orchestrates schema creation and all feature tables

train_model_template.py

Complete training pipeline template using Feature Engineering with FeatureLookup, create_training_set, and fe.log_model with output_schema. Includes inline helpers, label type casting, label binarization, single-class detection, and proper exit signals.

Key functions:

  • setup_mlflow_experiment(model_name) - Inlined /Shared/ path setup
  • get_parameters() - Returns (catalog, gold_schema, feature_schema, model_name)
  • create_training_set_with_features(...) - FeatureLookup + create_training_set
  • prepare_and_train(training_df, ...) - Data prep with DECIMAL→DOUBLE + train/eval
  • log_model_with_feature_engineering(...) - fe.log_model() with output_schema
  • main() - Full pipeline with error handling + exit signal

batch_inference_template.py

Complete batch inference template using fe.score_batch for automatic feature retrieval. Supports multi-model scoring loop with per-model error isolation and PARTIAL_FAILURE exit signal.

Key functions:

  • get_parameters() - Returns (catalog, gold_schema, feature_schema)
  • load_model_uri(catalog, feature_schema, model_name) - Get latest model version URI
  • score_with_feature_engineering(...) - fe.score_batch() + metadata columns + Delta save
  • run_inference_for_model(...) - Single-model inference with error isolation
  • main() - Multi-model loop with summary + PARTIAL_FAILURE handling

Assets

ml-feature-pipeline-job.yaml

Asset Bundle job template for feature table creation. Includes databricks-feature-engineering dependency, notebook_task with base_parameters (catalog, gold_schema, feature_schema), and proper tags.

Usage:

yaml
# Copy and customize for your project
# Replace {project} placeholder in notebook_path

ml-training-pipeline-job.yaml

Asset Bundle job template for parallel model training. Includes pinned package versions (mlflow==3.7.0, scikit-learn==1.3.2, xgboost==2.0.3), multiple parallel tasks (one per model), weekly schedule (paused by default), and 4-hour timeout. Does NOT define experiments (experiments created in notebook code).

Usage:

yaml
# Copy and customize: add one task_key per model
# Pin package versions to match your training environment

ml-inference-pipeline-job.yaml

Asset Bundle job template for batch inference with fe.score_batch. Includes pinned package versions (MUST match training), daily schedule (paused by default), 2-hour timeout, and proper tags.

Usage:

yaml
# Copy and customize for your project
# Package versions MUST match training pipeline exactly

Quick Validation Checklists

Pre-Development

  • Verify ALL column names against Gold layer YAML schemas
  • Check if dimension tables are SCD2 (has is_current?)
  • Confirm data types (DECIMAL, STRING, BOOLEAN, etc.)
  • Identify categorical columns needing encoding
  • Review Feature Store tables if using feature lookups
  • Fill in Requirements Template

MLflow Setup

  • Use /Shared/{project}_ml_{model_name} experiment path
  • Do NOT define experiments in Asset Bundle
  • Inline ALL helper functions (no module imports)
  • Set mlflow.set_registry_uri("databricks-uc") at module level
  • Add mlflow.autolog(disable=True) before custom logging

Feature Table Creation

  • Cast ALL numeric columns to DOUBLE
  • Clean NaN/Inf at source with clean_numeric() (sklearn compatibility)
  • Filter NULL primary key rows before writing
  • All columns verified against Gold layer schema
  • Feature table has descriptive description
  • Rolling window aggregations use proper Window specs

Feature Engineering (Training Set)

  • Uses FeatureLookup for feature retrieval
  • base_df has ONLY lookup keys + label column
  • lookup_key matches feature table primary keys EXACTLY
  • training_set passed to fe.log_model (embeds feature metadata)
  • Exclude Labels: Use exclude_columns=[LABEL_COLUMN] — label MUST NOT be a feature

Training Pipeline

  • Log dataset INSIDE mlflow.start_run() context
  • Use fe.log_model() with flavor=mlflow.sklearn and output_schema (PRIMARY)
  • input_example provided (or use infer_input_example=True)
  • Register model with 3-level name: catalog.schema.model_name
  • Label column CAST to correct type (INT for classification, DOUBLE for regression)
  • All features CAST to float64 before training
  • Label Binarization: Binarize 0-1 rates for XGBClassifier
  • Single-Class Check: Verify label has multiple classes before training classifier
  • Add dbutils.notebook.exit("SUCCESS") at end
  • All helper functions inlined (not imported)

Batch Inference

  • Uses fe.score_batch for automatic feature retrieval
  • Scoring DataFrame has ONLY lookup keys
  • Model URI points to latest version
  • Predictions saved with metadata columns (model_name, model_uri, scored_at)
  • Verify feature table has no NaN before inference (sklearn compatibility)
  • Error handling with proper exit signals (SUCCESS / PARTIAL_FAILURE / FAILED)
  • Test with small batch before full inference

Job Configuration

  • Use notebook_task with base_parameters (not argparse)
  • Pin exact package versions (match training and inference)
  • DO NOT define experiments in Asset Bundle

See: Troubleshooting for detailed checklists


Time Estimates

TaskDuration
Feature Tables Setup2-3 hours
First Model (with FE)3-4 hours
Additional Models (each)1-2 hours
Batch Inference Pipeline2-3 hours
Asset Bundle Configuration1 hour
Total (5 models)10-16 hours

Version History

v5.0 (February 2026)

  • Merged comprehensive implementation guide (12-ml-models-prompt.md)
  • Added Quick Start, Architecture, Directory Structure, Time Estimates
  • Expanded to 19 non-negotiable rules (merged from 10+16)
  • Created 3 complete script templates (feature tables, training, inference)
  • Split asset templates into 3 separate job YAMLs
  • Added requirements-template.md and feature-engineering-workflow.md
  • Replaced hardcoded project names with {project} placeholders
  • Resolved model registration conflict (output_schema as primary)
  • Resolved NaN handling (clean at source, not training time)

Future Enhancements (v6.0)

  • Model aliases (@champion/@challenger) for lifecycle management
  • Hyperparameter tuning (Hyperopt/Optuna + MLflow)
  • Feature importance logging and visualization
  • Cross-validation patterns (TimeSeriesSplit)
  • Prediction monitoring integration with Lakehouse Monitoring
  • AutoML baseline patterns

v4.0 (January 14, 2026)

  • Restructured to comply with AgentSkills.io specification
  • Split into reference files for better organization
  • Extracted scripts and templates

v3.0 (January 4, 2026)

  • NaN handling at feature table source
  • Label binarization patterns
  • Single-class data detection
  • Feature column exclusion

v2.0 (January 2026)

  • fe.log_model() and output_schema patterns
  • Model type to DataType mapping

v1.0 (December 2025)

  • Initial patterns from 5 model implementation

Pipeline Progression

Previous stage: monitoring/00-observability-setup → Monitoring, dashboards, and alerts should be configured

Next stage: After completing ML setup, proceed to:

  • genai-agents/00-genai-agents-setup — Implement GenAI agents with ResponsesAgent, Genie Spaces, and evaluation

References

Official Documentation

Feature Engineering

Related Skills

  • databricks-python-imports - sys.path setup for Asset Bundles
  • databricks-asset-bundles - Infrastructure-as-code patterns
  • databricks-autonomous-operations - Troubleshooting: Read when jobs fail — provides Deploy → Poll → Diagnose → Fix → Redeploy autonomous loop, error-solution matrix, and self-healing patterns