AgentSkillsCN

ml-pipeline-expert

Vertex AI 模型训练、特征工程、模型部署与监控。在构建机器学习流水线、训练模型,或践行 MLOps 工作流时,此技能能够助您一臂之力。

SKILL.md
--- frontmatter
name: ml-pipeline-expert
description: Vertex AI model training, feature engineering, model deployment, monitoring. Use when building ML pipelines, training models, or implementing MLOps workflows.
allowed-tools: Read, Write, Bash, Grep
model: claude-opus-4-5-20251101

ML Pipeline Expert Skill

When to Use This Skill

Activate when:

  • Training pricing prediction models
  • Building feature engineering pipelines
  • Deploying models to Vertex AI
  • Setting up model monitoring
  • Implementing MLOps workflows
  • Debugging model performance issues

Core Patterns

1. Feature Engineering in BigQuery

sql
-- models/ml/features_pricing_model.sql
-- Feature table for creator pricing prediction

WITH creator_stats AS (
    SELECT
        creator_id,
        platform,
        follower_count,
        AVG(engagement_rate) AS avg_engagement_rate,
        COUNT(DISTINCT video_id) AS video_count_30d,
        AVG(views) AS avg_views_per_video
    FROM {{ ref('int_creator_video_metrics') }}
    WHERE published_at >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
    GROUP BY 1, 2, 3
),

audience_quality AS (
    SELECT
        creator_id,
        AVG(watch_time_seconds / duration_seconds) AS avg_completion_rate,
        COUNT(DISTINCT country) AS country_diversity,
        -- Audience demographics (derived from platform insights)
        AVG(CASE WHEN age_group = '18-24' THEN 1 ELSE 0 END) AS pct_age_18_24,
        AVG(CASE WHEN age_group = '25-34' THEN 1 ELSE 0 END) AS pct_age_25_34
    FROM {{ ref('int_audience_demographics') }}
    GROUP BY 1
),

historical_deals AS (
    SELECT
        creator_id,
        AVG(deal_value_usd) AS avg_historical_rate,
        COUNT(*) AS deal_count,
        MAX(deal_date) AS last_deal_date
    FROM {{ ref('fact_creator_deals') }}
    WHERE deal_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 365 DAY)
    GROUP BY 1
),

niche_benchmarks AS (
    SELECT
        c.creator_id,
        c.niche_category,
        AVG(d.deal_value_usd) AS niche_avg_rate
    FROM {{ ref('dim_creator') }} c
    LEFT JOIN {{ ref('fact_creator_deals') }} d
        ON c.niche_category = d.niche_category
    WHERE d.deal_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 365 DAY)
    GROUP BY 1, 2
),

features AS (
    SELECT
        cs.creator_id,
        cs.platform,
        
        -- Raw features
        cs.follower_count,
        cs.avg_engagement_rate,
        cs.video_count_30d,
        cs.avg_views_per_video,
        
        -- Derived features
        LOG(cs.follower_count + 1) AS log_follower_count,
        cs.avg_views_per_video / NULLIF(cs.follower_count, 0) AS view_rate,
        
        -- Audience quality
        aq.avg_completion_rate,
        aq.country_diversity,
        aq.pct_age_18_24,
        aq.pct_age_25_34,
        
        -- Historical context
        hd.avg_historical_rate,
        hd.deal_count,
        DATE_DIFF(CURRENT_DATE(), hd.last_deal_date, DAY) AS days_since_last_deal,
        
        -- Niche context
        nb.niche_avg_rate,
        
        -- Target variable (for training)
        hd.avg_historical_rate AS target_rate
        
    FROM creator_stats cs
    LEFT JOIN audience_quality aq ON cs.creator_id = aq.creator_id
    LEFT JOIN historical_deals hd ON cs.creator_id = hd.creator_id
    LEFT JOIN niche_benchmarks nb ON cs.creator_id = nb.creator_id
    WHERE hd.avg_historical_rate IS NOT NULL  -- Only creators with deal history
)

SELECT * FROM features

Feature Engineering Best Practices:

  • Log transforms: For skewed distributions (follower_count)
  • Ratios: More informative than raw counts (view_rate)
  • Temporal features: Days since last event
  • Categorical encoding: One-hot or target encoding
  • Null handling: Use NULLIF, COALESCE

2. Train/Test Split in BigQuery

sql
-- models/ml/features_pricing_model_split.sql

WITH features AS (
    SELECT * FROM {{ ref('features_pricing_model') }}
),

split AS (
    SELECT
        *,
        -- Deterministic split based on creator_id hash
        MOD(ABS(FARM_FINGERPRINT(creator_id)), 100) AS random_bucket
    FROM features
)

SELECT
    * EXCEPT(random_bucket),
    CASE 
        WHEN random_bucket < 80 THEN 'train'
        WHEN random_bucket < 90 THEN 'val'
        ELSE 'test'
    END AS split
FROM split

Split Strategy:

  • 80/10/10: Train/Val/Test
  • Deterministic: Same creator always in same split
  • Stratified: Ensure balanced target distribution

3. Model Training Pipeline (Python)

python
# src/ml/train_pricing_model.py

from google.cloud import bigquery, aiplatform
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_absolute_error, r2_score
import pandas as pd
import joblib
from datetime import datetime

class PricingModelTrainer:
    """
    Train XGBoost pricing model using BigQuery features
    """
    
    def __init__(self, project_id: str, dataset: str):
        self.project_id = project_id
        self.dataset = dataset
        self.client = bigquery.Client(project=project_id)
        
    def load_features(self, split: str) -> pd.DataFrame:
        """Load features from BigQuery"""
        query = f"""
        SELECT * EXCEPT(creator_id, split)
        FROM `{self.project_id}.{self.dataset}.features_pricing_model_split`
        WHERE split = '{split}'
        """
        return self.client.query(query).to_dataframe()
    
    def train(self, config: dict) -> dict:
        """
        Train pricing model
        
        Args:
            config: Model hyperparameters
            
        Returns:
            metrics: Training and validation metrics
        """
        # Load data
        train_df = self.load_features('train')
        val_df = self.load_features('val')
        
        # Separate features and target
        feature_cols = [
            'log_follower_count',
            'avg_engagement_rate',
            'video_count_30d',
            'view_rate',
            'avg_completion_rate',
            'country_diversity',
            'pct_age_18_24',
            'pct_age_25_34',
            'days_since_last_deal',
            'niche_avg_rate'
        ]
        
        X_train = train_df[feature_cols]
        y_train = train_df['target_rate']
        
        X_val = val_df[feature_cols]
        y_val = val_df['target_rate']
        
        # Handle missing values
        X_train = X_train.fillna(X_train.median())
        X_val = X_val.fillna(X_train.median())
        
        # Train model
        model = GradientBoostingRegressor(
            n_estimators=config.get('n_estimators', 100),
            learning_rate=config.get('learning_rate', 0.1),
            max_depth=config.get('max_depth', 5),
            random_state=42
        )
        
        model.fit(X_train, y_train)
        
        # Evaluate
        train_preds = model.predict(X_train)
        val_preds = model.predict(X_val)
        
        metrics = {
            'train_mae': mean_absolute_error(y_train, train_preds),
            'train_r2': r2_score(y_train, train_preds),
            'val_mae': mean_absolute_error(y_val, val_preds),
            'val_r2': r2_score(y_val, val_preds),
            'feature_importance': dict(zip(
                feature_cols,
                model.feature_importances_
            ))
        }
        
        # Save model
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        model_path = f'models/pricing_model_{timestamp}.joblib'
        joblib.dump(model, model_path)
        
        return metrics, model_path
    
    def evaluate_test_set(self, model_path: str) -> dict:
        """Evaluate on held-out test set"""
        model = joblib.load(model_path)
        test_df = self.load_features('test')
        
        feature_cols = [c for c in test_df.columns if c != 'target_rate']
        X_test = test_df[feature_cols].fillna(test_df[feature_cols].median())
        y_test = test_df['target_rate']
        
        test_preds = model.predict(X_test)
        
        return {
            'test_mae': mean_absolute_error(y_test, test_preds),
            'test_r2': r2_score(y_test, test_preds),
            'test_samples': len(test_df)
        }


# Usage
if __name__ == "__main__":
    trainer = PricingModelTrainer(
        project_id='sponsorgraph-prod',
        dataset='ml'
    )
    
    config = {
        'n_estimators': 200,
        'learning_rate': 0.05,
        'max_depth': 6
    }
    
    metrics, model_path = trainer.train(config)
    print(f"Training complete: {metrics}")
    
    test_metrics = trainer.evaluate_test_set(model_path)
    print(f"Test set performance: {test_metrics}")

4. Deploy to Vertex AI

python
# src/ml/deploy_model.py

from google.cloud import aiplatform
from google.cloud import storage
import joblib

def upload_model_to_gcs(local_path: str, bucket_name: str) -> str:
    """Upload model to GCS"""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    
    blob_name = f"models/{local_path.split('/')[-1]}"
    blob = bucket.blob(blob_name)
    blob.upload_from_filename(local_path)
    
    return f"gs://{bucket_name}/{blob_name}"

def deploy_to_vertex_ai(
    model_path: str,
    model_name: str,
    project_id: str,
    location: str = 'us-central1'
) -> str:
    """
    Deploy model to Vertex AI endpoint
    
    Returns:
        endpoint_id: Deployed endpoint ID
    """
    aiplatform.init(project=project_id, location=location)
    
    # Upload model
    model = aiplatform.Model.upload(
        display_name=model_name,
        artifact_uri=model_path,
        serving_container_image_uri='us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest'
    )
    
    # Create endpoint
    endpoint = aiplatform.Endpoint.create(
        display_name=f"{model_name}_endpoint"
    )
    
    # Deploy model to endpoint
    model.deploy(
        endpoint=endpoint,
        deployed_model_display_name=model_name,
        machine_type='n1-standard-4',
        min_replica_count=1,
        max_replica_count=5,
        traffic_percentage=100
    )
    
    return endpoint.resource_name


# Usage
if __name__ == "__main__":
    # Upload model to GCS
    gcs_path = upload_model_to_gcs(
        local_path='models/pricing_model_20260207.joblib',
        bucket_name='sponsorgraph-models'
    )
    
    # Deploy to Vertex AI
    endpoint_id = deploy_to_vertex_ai(
        model_path=gcs_path,
        model_name='pricing_model_v1',
        project_id='sponsorgraph-prod'
    )
    
    print(f"Model deployed to endpoint: {endpoint_id}")

5. Model Monitoring

python
# src/ml/monitor_model.py

from google.cloud import bigquery, monitoring_v3
from datetime import datetime, timedelta
import pandas as pd

class ModelMonitor:
    """Monitor model predictions and detect drift"""
    
    def __init__(self, project_id: str):
        self.project_id = project_id
        self.bq_client = bigquery.Client(project=project_id)
        
    def log_prediction(self, prediction_data: dict):
        """Log prediction to BigQuery for monitoring"""
        table_id = f"{self.project_id}.ml.model_predictions"
        
        rows_to_insert = [{
            'prediction_id': prediction_data['prediction_id'],
            'creator_id': prediction_data['creator_id'],
            'predicted_rate': prediction_data['predicted_rate'],
            'features': prediction_data['features'],
            'model_version': prediction_data['model_version'],
            'timestamp': datetime.utcnow().isoformat()
        }]
        
        errors = self.bq_client.insert_rows_json(table_id, rows_to_insert)
        if errors:
            print(f"Errors logging prediction: {errors}")
    
    def check_prediction_drift(self, days: int = 7) -> dict:
        """
        Check if prediction distribution has drifted
        
        Returns:
            drift_metrics: Mean, std, quantiles over time
        """
        query = f"""
        WITH recent_predictions AS (
            SELECT
                DATE(timestamp) AS pred_date,
                predicted_rate
            FROM `{self.project_id}.ml.model_predictions`
            WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {days} DAY)
        ),
        
        daily_stats AS (
            SELECT
                pred_date,
                AVG(predicted_rate) AS mean_pred,
                STDDEV(predicted_rate) AS std_pred,
                APPROX_QUANTILES(predicted_rate, 100)[OFFSET(50)] AS median_pred,
                COUNT(*) AS prediction_count
            FROM recent_predictions
            GROUP BY 1
            ORDER BY 1
        )
        
        SELECT * FROM daily_stats
        """
        
        df = self.bq_client.query(query).to_dataframe()
        
        # Calculate drift score (coefficient of variation over time)
        drift_score = df['std_pred'].mean() / df['mean_pred'].mean()
        
        return {
            'drift_score': drift_score,
            'daily_stats': df.to_dict('records'),
            'alert': drift_score > 0.3  # Alert if CV > 30%
        }
    
    def compare_with_actuals(self) -> dict:
        """
        Compare predictions with actual deal values
        For predictions where we later got actual data
        """
        query = f"""
        WITH predictions AS (
            SELECT
                creator_id,
                predicted_rate,
                timestamp AS pred_timestamp
            FROM `{self.project_id}.ml.model_predictions`
        ),
        
        actuals AS (
            SELECT
                creator_id,
                deal_value_usd AS actual_rate,
                deal_date
            FROM `{self.project_id}.mart.fact_creator_deals`
        ),
        
        matched AS (
            SELECT
                p.creator_id,
                p.predicted_rate,
                a.actual_rate,
                ABS(p.predicted_rate - a.actual_rate) AS absolute_error,
                ABS(p.predicted_rate - a.actual_rate) / a.actual_rate AS pct_error
            FROM predictions p
            INNER JOIN actuals a
                ON p.creator_id = a.creator_id
                AND DATE(a.deal_date) >= DATE(p.pred_timestamp)
                AND DATE(a.deal_date) <= DATE_ADD(DATE(p.pred_timestamp), INTERVAL 30 DAY)
        )
        
        SELECT
            AVG(absolute_error) AS mae,
            AVG(pct_error) AS mape,
            COUNT(*) AS sample_size
        FROM matched
        """
        
        result = self.bq_client.query(query).to_dataframe()
        return result.iloc[0].to_dict()


# Usage
if __name__ == "__main__":
    monitor = ModelMonitor(project_id='sponsorgraph-prod')
    
    # Check for drift
    drift_metrics = monitor.check_prediction_drift(days=7)
    
    if drift_metrics['alert']:
        print("⚠️ Model drift detected!")
        print(f"Drift score: {drift_metrics['drift_score']:.3f}")
    
    # Compare with actuals
    performance = monitor.compare_with_actuals()
    print(f"Model MAE: ${performance['mae']:.2f}")
    print(f"Model MAPE: {performance['mape']*100:.1f}%")

6. Hyperparameter Tuning with Vertex AI

python
# src/ml/tune_hyperparameters.py

from google.cloud import aiplatform
from google.cloud.aiplatform import hyperparameter_tuning as hpt

def tune_hyperparameters(
    project_id: str,
    training_script: str,
    parameter_spec: dict
) -> str:
    """
    Run hyperparameter tuning job on Vertex AI
    
    Args:
        project_id: GCP project
        training_script: Path to training script
        parameter_spec: Hyperparameter search space
        
    Returns:
        best_trial_id: ID of best hyperparameter combination
    """
    aiplatform.init(project=project_id, location='us-central1')
    
    # Define hyperparameter tuning job
    job = aiplatform.HyperparameterTuningJob(
        display_name='pricing_model_tuning',
        custom_job=aiplatform.CustomJob.from_local_script(
            display_name='pricing_model_training',
            script_path=training_script,
            container_uri='us-docker.pkg.dev/vertex-ai/training/scikit-learn-cpu.1-0:latest',
            requirements=['scikit-learn==1.3.0', 'google-cloud-bigquery==3.10.0']
        ),
        metric_spec={
            'val_mae': 'minimize'
        },
        parameter_spec=parameter_spec,
        max_trial_count=20,
        parallel_trial_count=5
    )
    
    # Run tuning
    job.run()
    
    # Get best trial
    best_trial = min(job.trials, key=lambda t: t.final_measurement.metrics[0].value)
    
    print(f"Best trial: {best_trial.id}")
    print(f"Best MAE: {best_trial.final_measurement.metrics[0].value:.2f}")
    print(f"Best parameters: {best_trial.parameters}")
    
    return best_trial.id


# Define search space
parameter_spec = {
    'n_estimators': hpt.IntegerParameterSpec(min=50, max=300, scale='linear'),
    'learning_rate': hpt.DoubleParameterSpec(min=0.01, max=0.3, scale='log'),
    'max_depth': hpt.IntegerParameterSpec(min=3, max=10, scale='linear'),
}

# Run tuning
best_trial = tune_hyperparameters(
    project_id='sponsorgraph-prod',
    training_script='src/ml/train_pricing_model.py',
    parameter_spec=parameter_spec
)

Model Performance Targets

Based on business_logic.md benchmarks:

python
MODEL_PERFORMANCE_TARGETS = {
    'pricing_model': {
        'mae': 0.15,  # Mean Absolute Error < 15%
        'r2': 0.80,   # R² > 0.80 (explain 80% of variance)
        'mape': 0.20, # Mean Absolute Percentage Error < 20%
    },
    
    'attribution_model': {
        'accuracy': 0.90,  # 90% accuracy vs holdout
    },
    
    'conversion_prediction': {
        'mae': 0.20,  # Within 20% of actual
        'confidence_interval': 0.85,
    }
}

MLOps Workflow

bash
# 1. Feature engineering
dbt run --select features_pricing_model

# 2. Train model
python src/ml/train_pricing_model.py --config configs/pricing_v1.yaml

# 3. Evaluate on test set
python src/ml/evaluate_model.py --model-path models/pricing_model_20260207.joblib

# 4. Deploy to Vertex AI
python src/ml/deploy_model.py --model-path models/pricing_model_20260207.joblib

# 5. Monitor predictions
python src/ml/monitor_model.py --check-drift --days 7

Best Practices

1. Version Everything

python
MODEL_VERSION = "v1.2.3"
FEATURE_VERSION = "v2.0.0"
TRAINING_DATE = "2026-02-07"

model_metadata = {
    'model_version': MODEL_VERSION,
    'feature_version': FEATURE_VERSION,
    'training_date': TRAINING_DATE,
    'training_samples': len(train_df),
    'val_mae': val_mae,
    'feature_columns': feature_cols
}

# Save with model
joblib.dump({
    'model': model,
    'metadata': model_metadata
}, model_path)

2. Feature Store Pattern

sql
-- Centralized feature table
-- models/ml/feature_store.sql

{{ config(
    materialized='table',
    partition_by={'field': 'feature_timestamp', 'data_type': 'timestamp'}
) }}

SELECT
    creator_id,
    feature_timestamp,
    
    -- Features
    follower_count,
    avg_engagement_rate,
    -- ... all features
    
    -- Metadata
    'v2.0.0' AS feature_version
FROM {{ ref('int_creator_features') }}

3. Separate Training/Serving Features

python
# Training: Use historical features
train_features = """
SELECT * FROM ml.features_pricing_model
WHERE feature_timestamp <= '2025-12-31'  # Historical cutoff
"""

# Serving: Use current features
serve_features = """
SELECT * FROM ml.feature_store
WHERE feature_timestamp = (SELECT MAX(feature_timestamp) FROM ml.feature_store)
"""

Related Resources