AgentSkillsCN

run-ab-test-models

针对生产环境中的机器学习模型,采用流量拆分、统计显著性检验以及金丝雀/影子部署等策略,开展 A/B 测试。通过对比不同模型的性能表现,基于数据驱动的决策来优化模型的上线与迭代。

SKILL.md
--- frontmatter
name: run-ab-test-models
description: >
  Design and execute A/B tests for ML models in production using traffic splitting,
  statistical significance testing, and canary/shadow deployment strategies. Measure
  performance differences and make data-driven decisions about model rollout.
license: MIT
allowed-tools: Read Write Edit Bash Grep Glob
metadata:
  author: Philipp Thoss
  version: "1.0"
  domain: mlops
  complexity: intermediate
  language: multi
  tags: ab-testing, canary, shadow-deployment, traffic-splitting, statistical-significance, experimentation

Run A/B Test for Models

Execute controlled experiments comparing model versions using traffic splitting and statistical analysis.

When to Use

  • Deploying new model version and want to validate improvement before full rollout
  • Comparing multiple candidate models trained with different algorithms or features
  • Testing impact of hyperparameter changes on business metrics
  • Need to measure model performance in production without risking full traffic
  • Regulatory requirements for gradual rollout (e.g., medical ML systems)
  • Evaluating cost-performance tradeoffs between model sizes

Inputs

  • Required: Champion model (current production version)
  • Required: Challenger model(s) (new version to test)
  • Required: Traffic allocation percentage (e.g., 5% to challenger)
  • Required: Success metrics (business and ML metrics)
  • Required: Minimum sample size or test duration
  • Optional: Guardrail metrics (latency, error rate thresholds)
  • Optional: User segments for stratified testing

Procedure

Step 1: Design Experiment

Define test parameters, success criteria, and statistical requirements.

python
# ab_test/experiment_config.py
from dataclasses import dataclass
from typing import List, Dict
import numpy as np
from scipy.stats import norm


@dataclass
class ExperimentConfig:
    name: str
    champion_model: str
    challenger_models: List[str]
    traffic_allocation: Dict[str, float]  # {model_name: traffic_pct}
    primary_metric: str
    secondary_metrics: List[str]
    guardrail_metrics: Dict[str, tuple]  # {metric: (min, max)}
    min_sample_size: int
    max_duration_days: int
    significance_level: float = 0.05
    power: float = 0.8
    minimum_detectable_effect: float = 0.05  # 5% relative improvement


def calculate_sample_size(
    baseline_conversion: float,
    min_detectable_effect: float,
    alpha: float = 0.05,
    power: float = 0.8,
) -> int:
    """
    Calculate required sample size per variant for binomial metric.

    Args:
        baseline_conversion: Current conversion rate (0-1)
        min_detectable_effect: Relative improvement to detect (e.g., 0.05 for 5%)
        alpha: Significance level (Type I error)
        power: Statistical power (1 - Type II error)

    Returns:
        Required sample size per variant
    """
    # Effect size (difference in proportions)
    p1 = baseline_conversion
    p2 = p1 * (1 + min_detectable_effect)

    # Pooled proportion
    p_pool = (p1 + p2) / 2

    # Z-scores for alpha and power
    z_alpha = norm.ppf(1 - alpha / 2)
    z_beta = norm.ppf(power)

    # Sample size formula for two proportions
    n = (
        (z_alpha * np.sqrt(2 * p_pool * (1 - p_pool)) +
         z_beta * np.sqrt(p1 * (1 - p1) + p2 * (1 - p2))) ** 2
    ) / ((p2 - p1) ** 2)

    return int(np.ceil(n))


# Example experiment configuration
FRAUD_DETECTION_EXPERIMENT = ExperimentConfig(
    name="fraud_model_v2_test",
    champion_model="fraud_detector_v1.3",
    challenger_models=["fraud_detector_v2.0"],
    traffic_allocation={
        "fraud_detector_v1.3": 0.95,
        "fraud_detector_v2.0": 0.05,
    },
    primary_metric="fraud_detection_rate",
    secondary_metrics=["false_positive_rate", "review_time_savings"],
    guardrail_metrics={
        "p95_latency_ms": (0, 200),
        "error_rate": (0, 0.01),
    },
    min_sample_size=10000,
    max_duration_days=14,
    minimum_detectable_effect=0.10,  # Detect 10% improvement
)

# Calculate required sample size
baseline_fraud_rate = 0.15
required_n = calculate_sample_size(
    baseline_conversion=baseline_fraud_rate,
    min_detectable_effect=0.10,  # Detect 10% improvement
    alpha=0.05,
    power=0.8,
)

print(f"Required sample size per variant: {required_n:,}")
print(f"With 5% traffic to challenger, need ~{required_n / 0.05:,.0f} total transactions")

Expected: Experiment configuration with statistically sound sample size calculation, typically 5-10k samples per variant for 5-10% MDE.

On failure: If required sample size too large, increase traffic allocation, extend test duration, or accept larger MDE; verify baseline metric estimate is accurate; consider sequential testing for continuous monitoring.

Step 2: Implement Traffic Splitting

Set up routing logic to randomly assign requests to models.

python
# ab_test/traffic_router.py
import hashlib
import random
from typing import Dict, Optional
from dataclasses import dataclass
import logging

logger = logging.getLogger(__name__)


@dataclass
class RoutingDecision:
    model_name: str
    variant: str
    assignment_id: str


class TrafficRouter:
    def __init__(self, experiment_config: 'ExperimentConfig', seed: int = 42):
        self.config = experiment_config
        self.traffic_allocation = experiment_config.traffic_allocation
        self.seed = seed

        # Validate allocation sums to 1.0
        total = sum(self.traffic_allocation.values())
        assert abs(total - 1.0) < 0.001, f"Traffic allocation must sum to 1.0, got {total}"

        # Create cumulative buckets for routing
        self.buckets = self._create_buckets()

    def _create_buckets(self) -> Dict[str, tuple]:
        """
        Create cumulative probability buckets.
        Example: {model_a: 0.95, model_b: 0.05} ->
                 {model_a: (0, 0.95), model_b: (0.95, 1.0)}
        """
        buckets = {}
        cumsum = 0.0
        for model_name, traffic in self.traffic_allocation.items():
            buckets[model_name] = (cumsum, cumsum + traffic)
            cumsum += traffic
        return buckets

    def assign_variant(self, user_id: str) -> RoutingDecision:
        """
        Deterministically assign user to variant using consistent hashing.
        Same user_id always gets same variant during experiment.
        """
        # Hash user_id to get deterministic random value [0, 1)
        hash_value = int(
            hashlib.md5(f"{user_id}_{self.config.name}_{self.seed}".encode()).hexdigest(),
            16
        )
        random_value = (hash_value % 10000) / 10000.0

        # Find which bucket user falls into
        for model_name, (lower, upper) in self.buckets.items():
            if lower <= random_value < upper:
                return RoutingDecision(
                    model_name=model_name,
                    variant="champion" if model_name == self.config.champion_model else "challenger",
                    assignment_id=f"{user_id}_{self.config.name}",
                )

        # Fallback (should never happen)
        logger.error(f"Failed to assign variant for user {user_id}, using champion")
        return RoutingDecision(
            model_name=self.config.champion_model,
            variant="champion",
            assignment_id=f"{user_id}_{self.config.name}",
        )

    def assign_random_variant(self, session_id: str) -> RoutingDecision:
        """
        Random assignment for non-logged-in users (less stable but acceptable).
        """
        random_value = random.random()
        for model_name, (lower, upper) in self.buckets.items():
            if lower <= random_value < upper:
                return RoutingDecision(
                    model_name=model_name,
                    variant="champion" if model_name == self.config.champion_model else "challenger",
                    assignment_id=f"session_{session_id}",
                )

        return RoutingDecision(
            model_name=self.config.champion_model,
            variant="champion",
            assignment_id=f"session_{session_id}",
        )


# Example usage in prediction service
class ModelServer:
    def __init__(self, router: TrafficRouter):
        self.router = router
        self.models = self._load_models()

    def _load_models(self):
        # Load all models in experiment
        return {
            "fraud_detector_v1.3": load_model("models/fraud_v1.3.pkl"),
            "fraud_detector_v2.0": load_model("models/fraud_v2.0.pkl"),
        }

    def predict(self, user_id: str, features: dict) -> dict:
        # Assign user to variant
        assignment = self.router.assign_variant(user_id)

        # Get prediction from assigned model
        model = self.models[assignment.model_name]
        prediction = model.predict_proba(features)

        # Log assignment for analysis
        log_experiment_event({
            "user_id": user_id,
            "assignment_id": assignment.assignment_id,
            "model_name": assignment.model_name,
            "variant": assignment.variant,
            "prediction": prediction,
            "timestamp": datetime.now().isoformat(),
        })

        return {
            "prediction": prediction,
            "model_version": assignment.model_name,
        }


def load_model(path: str):
    """Load model from disk."""
    import joblib
    return joblib.load(path)


def log_experiment_event(event: dict):
    """Log to data warehouse for analysis."""
    # Example: send to Kafka or write to database
    logger.info(f"Experiment event: {event}")

Expected: Consistent user-to-variant assignment, accurate traffic split matching configured percentages, all assignments logged for analysis.

On failure: Verify hash function produces uniform distribution (test with 10k user IDs), check that user_id is stable across requests (not session_id), ensure logs capture all prediction events, validate traffic split in first 1000 requests.

Step 3: Implement Shadow Deployment (Optional)

Run challenger model in parallel without affecting users (shadow mode).

python
# ab_test/shadow_deployment.py
import asyncio
from typing import Dict, Any
import logging
from concurrent.futures import ThreadPoolExecutor
import time

logger = logging.getLogger(__name__)


class ShadowDeployment:
    """
    Run champion and challenger in parallel, only return champion results.
    Compare predictions for consistency/debugging before live traffic split.
    """
    def __init__(self, champion_model, challenger_model, log_differences: bool = True):
        self.champion = champion_model
        self.challenger = challenger_model
        self.log_differences = log_differences
        self.executor = ThreadPoolExecutor(max_workers=2)

    def predict(self, features: Dict[str, Any]) -> Dict:
        """
        Get predictions from both models, return only champion result.
        """
        # Run both predictions in parallel
        champion_future = self.executor.submit(self._predict_champion, features)
        challenger_future = self.executor.submit(self._predict_challenger, features)

        # Wait for champion (serve immediately)
        champion_result = champion_future.result()

        # Log challenger result asynchronously
        self.executor.submit(self._log_challenger_result, challenger_future, champion_result, features)

        return champion_result

    def _predict_champion(self, features: Dict) -> Dict:
        """Get champion prediction."""
        start_time = time.time()
        prediction = self.champion.predict_proba(features)
        latency = time.time() - start_time

        return {
            "prediction": prediction,
            "model": "champion",
            "latency_ms": latency * 1000,
        }

    def _predict_challenger(self, features: Dict) -> Dict:
        """Get challenger prediction."""
        start_time = time.time()
        try:
            prediction = self.challenger.predict_proba(features)
            latency = time.time() - start_time

            return {
                "prediction": prediction,
                "model": "challenger",
                "latency_ms": latency * 1000,
                "error": None,
            }
        except Exception as e:
            logger.error(f"Challenger prediction failed: {e}")
            return {
                "prediction": None,
                "model": "challenger",
                "latency_ms": None,
                "error": str(e),
            }

    def _log_challenger_result(self, future, champion_result: Dict, features: Dict):
        """Log challenger result for offline analysis."""
        try:
            challenger_result = future.result(timeout=5.0)

            # Calculate prediction difference
            if challenger_result["prediction"] is not None:
                pred_diff = abs(
                    champion_result["prediction"] - challenger_result["prediction"]
                )

                log_entry = {
                    "timestamp": time.time(),
                    "champion_prediction": champion_result["prediction"],
                    "challenger_prediction": challenger_result["prediction"],
                    "prediction_difference": pred_diff,
                    "champion_latency_ms": champion_result["latency_ms"],
                    "challenger_latency_ms": challenger_result["latency_ms"],
                    "features": features,
                }

                # Log large differences for investigation
                if self.log_differences and pred_diff > 0.1:
                    logger.warning(f"Large prediction difference: {pred_diff:.3f}")

                # Send to analytics pipeline
                self._send_to_analytics(log_entry)

        except Exception as e:
            logger.error(f"Failed to log challenger result: {e}")

    def _send_to_analytics(self, log_entry: Dict):
        """Send shadow deployment results to analytics."""
        # Example: write to Kafka, S3, or database
        # kafka_producer.send("shadow_predictions", log_entry)
        pass


# Example usage
if __name__ == "__main__":
    champion = load_model("models/fraud_v1.3.pkl")
    challenger = load_model("models/fraud_v2.0.pkl")

    shadow = ShadowDeployment(champion, challenger, log_differences=True)

    # Serve predictions (only champion affects users)
    for request in incoming_requests:
        result = shadow.predict(request["features"])
        send_response(result)

Expected: Champion predictions served with normal latency, challenger predictions logged asynchronously without blocking, prediction differences captured for analysis.

On failure: Set challenger timeout < champion SLA to avoid blocking, handle challenger errors gracefully without affecting champion, monitor memory usage (two models loaded), consider sampling (log only 10% of shadow predictions).

Step 4: Collect and Analyze Metrics

Gather experiment data and perform statistical tests.

python
# ab_test/analysis.py
import pandas as pd
import numpy as np
from scipy import stats
from typing import Dict, Tuple
import logging

logger = logging.getLogger(__name__)


class ExperimentAnalyzer:
    def __init__(self, experiment_config: 'ExperimentConfig'):
        self.config = experiment_config

    def load_experiment_data(self, start_date: str, end_date: str) -> pd.DataFrame:
        """
        Load experiment data from data warehouse.
        """
        query = f"""
        SELECT
            assignment_id,
            user_id,
            variant,
            model_name,
            prediction,
            outcome,  -- ground truth
            timestamp,
            latency_ms
        FROM experiment_logs
        WHERE experiment_name = '{self.config.name}'
          AND timestamp >= '{start_date}'
          AND timestamp < '{end_date}'
        """
        # df = data_warehouse.query(query)
        df = pd.read_parquet("ab_test/data/experiment_results.parquet")
        return df

    def calculate_metrics(self, df: pd.DataFrame) -> Dict[str, Dict]:
        """
        Calculate metrics for each variant.
        """
        results = {}

        for variant in df["variant"].unique():
            variant_df = df[df["variant"] == variant]

            # Primary metric (e.g., conversion rate, detection rate)
            if self.config.primary_metric == "fraud_detection_rate":
                primary_value = (variant_df["outcome"] == 1).mean()
            elif self.config.primary_metric == "conversion_rate":
                primary_value = (variant_df["outcome"] == "converted").mean()
            else:
                primary_value = variant_df["outcome"].mean()

            # Secondary metrics
            secondary = {}
            for metric in self.config.secondary_metrics:
                if metric == "false_positive_rate":
                    # FP = predicted positive, actually negative
                    secondary[metric] = (
                        (variant_df["prediction"] > 0.5) & (variant_df["outcome"] == 0)
                    ).mean()
                elif metric == "latency_ms":
                    secondary[metric] = variant_df["latency_ms"].mean()
                else:
                    secondary[metric] = variant_df[metric].mean()

            results[variant] = {
                "n_samples": len(variant_df),
                "primary_metric": primary_value,
                "secondary_metrics": secondary,
                "raw_data": variant_df,
            }

        return results

    def test_statistical_significance(
        self,
        champion_data: pd.Series,
        challenger_data: pd.Series,
        metric_type: str = "proportion",
    ) -> Tuple[float, float, Dict]:
        """
        Perform statistical test to determine if difference is significant.

        Args:
            champion_data: Champion variant metric values
            challenger_data: Challenger variant metric values
            metric_type: "proportion" (binomial) or "continuous" (t-test)

        Returns:
            (test_statistic, p_value, details_dict)
        """
        if metric_type == "proportion":
            # Two-proportion z-test
            n1, n2 = len(champion_data), len(challenger_data)
            p1, p2 = champion_data.mean(), challenger_data.mean()

            # Pooled proportion
            p_pool = (champion_data.sum() + challenger_data.sum()) / (n1 + n2)

            # Standard error
            se = np.sqrt(p_pool * (1 - p_pool) * (1/n1 + 1/n2))

            # Z-statistic
            z_stat = (p2 - p1) / se
            p_value = 2 * (1 - stats.norm.cdf(abs(z_stat)))

            # Confidence interval for difference
            diff = p2 - p1
            ci_margin = 1.96 * se

            details = {
                "champion_rate": p1,
                "challenger_rate": p2,
                "absolute_difference": diff,
                "relative_lift": (p2 - p1) / p1 if p1 > 0 else None,
                "confidence_interval": (diff - ci_margin, diff + ci_margin),
            }

            return z_stat, p_value, details

        elif metric_type == "continuous":
            # Welch's t-test (unequal variances)
            t_stat, p_value = stats.ttest_ind(
                challenger_data,
                champion_data,
                equal_var=False,
            )

            diff = challenger_data.mean() - champion_data.mean()

            details = {
                "champion_mean": champion_data.mean(),
                "challenger_mean": challenger_data.mean(),
                "absolute_difference": diff,
                "relative_change": diff / champion_data.mean() if champion_data.mean() != 0 else None,
            }

            return t_stat, p_value, details

        else:
            raise ValueError(f"Unknown metric_type: {metric_type}")

    def generate_report(self, df: pd.DataFrame) -> Dict:
        """
        Generate comprehensive experiment report.
        """
        metrics = self.calculate_metrics(df)

        champion_outcome = metrics["champion"]["raw_data"]["outcome"]
        challenger_outcome = metrics["challenger"]["raw_data"]["outcome"]

        # Test primary metric
        test_stat, p_value, details = self.test_statistical_significance(
            champion_outcome,
            challenger_outcome,
            metric_type="proportion",
        )

        # Determine winner
        is_significant = p_value < self.config.significance_level
        challenger_better = details["challenger_rate"] > details["champion_rate"]

        if is_significant and challenger_better:
            decision = "ROLLOUT_CHALLENGER"
            reason = f"Challenger significantly better (p={p_value:.4f}, lift={details['relative_lift']:.2%})"
        elif is_significant and not challenger_better:
            decision = "KEEP_CHAMPION"
            reason = f"Champion significantly better (p={p_value:.4f})"
        else:
            decision = "INCONCLUSIVE"
            reason = f"No significant difference (p={p_value:.4f})"

        report = {
            "experiment_name": self.config.name,
            "duration_days": (df["timestamp"].max() - df["timestamp"].min()).days,
            "total_samples": len(df),
            "champion_samples": metrics["champion"]["n_samples"],
            "challenger_samples": metrics["challenger"]["n_samples"],
            "primary_metric": {
                "name": self.config.primary_metric,
                "champion": details["champion_rate"],
                "challenger": details["challenger_rate"],
                "absolute_lift": details["absolute_difference"],
                "relative_lift": details["relative_lift"],
                "confidence_interval": details["confidence_interval"],
                "p_value": p_value,
                "significant": is_significant,
            },
            "decision": decision,
            "reason": reason,
        }

        return report


# Example usage
if __name__ == "__main__":
    from experiment_config import FRAUD_DETECTION_EXPERIMENT

    analyzer = ExperimentAnalyzer(FRAUD_DETECTION_EXPERIMENT)

    # Load data
    df = analyzer.load_experiment_data("2024-01-01", "2024-01-14")

    # Generate report
    report = analyzer.generate_report(df)

    print(f"Decision: {report['decision']}")
    print(f"Reason: {report['reason']}")
    print(f"Primary metric lift: {report['primary_metric']['relative_lift']:.2%}")
    print(f"P-value: {report['primary_metric']['p_value']:.4f}")

Expected: Statistical test results with p-values, confidence intervals, and clear decision (rollout/keep/inconclusive), typically after 7-14 days or reaching sample size.

On failure: Verify ground truth labels are available (may need delayed analysis), check for sample ratio mismatch (SRM) indicating assignment bugs, ensure sufficient sample size reached, look for novelty/primacy effects in early data, consider sequential testing if fixed-horizon test is too slow.

Step 5: Monitor Guardrail Metrics

Continuously check that challenger doesn't violate safety thresholds.

python
# ab_test/guardrails.py
import pandas as pd
import logging
from typing import Dict, List

logger = logging.getLogger(__name__)


class GuardrailMonitor:
    def __init__(self, experiment_config: 'ExperimentConfig'):
        self.config = experiment_config
        self.guardrails = experiment_config.guardrail_metrics
        self.violations = []

    def check_guardrails(self, df: pd.DataFrame) -> Dict:
        """
        Check if challenger violates any guardrail thresholds.
        """
        challenger_df = df[df["variant"] == "challenger"]

        violations = []
        metrics = {}

        for metric_name, (min_threshold, max_threshold) in self.guardrails.items():
            # Calculate metric
            if metric_name == "p95_latency_ms":
                metric_value = challenger_df["latency_ms"].quantile(0.95)
            elif metric_name == "error_rate":
                metric_value = (challenger_df["error"] == 1).mean()
            elif metric_name == "timeout_rate":
                metric_value = (challenger_df["timeout"] == 1).mean()
            else:
                metric_value = challenger_df[metric_name].mean()

            # Check thresholds
            violated = (metric_value < min_threshold or metric_value > max_threshold)

            if violated:
                violation = {
                    "metric": metric_name,
                    "value": metric_value,
                    "threshold": (min_threshold, max_threshold),
                    "timestamp": pd.Timestamp.now(),
                }
                violations.append(violation)
                logger.error(f"Guardrail violation: {metric_name}={metric_value:.3f}")

            metrics[metric_name] = {
                "value": metric_value,
                "threshold": (min_threshold, max_threshold),
                "violated": violated,
            }

        self.violations.extend(violations)

        return {
            "has_violations": len(violations) > 0,
            "violations": violations,
            "metrics": metrics,
        }

    def should_stop_experiment(self, violation_count: int = 3) -> bool:
        """
        Determine if experiment should be stopped due to repeated violations.
        """
        recent_violations = [
            v for v in self.violations
            if (pd.Timestamp.now() - v["timestamp"]).total_seconds() < 3600  # Last hour
        ]

        return len(recent_violations) >= violation_count


# Example monitoring loop
def monitor_experiment_health():
    """
    Continuous monitoring job that checks guardrails every 5 minutes.
    """
    import time
    from experiment_config import FRAUD_DETECTION_EXPERIMENT

    monitor = GuardrailMonitor(FRAUD_DETECTION_EXPERIMENT)

    while True:
        # Load recent data (last 15 minutes)
        df = load_recent_experiment_data(minutes=15)

        # Check guardrails
        result = monitor.check_guardrails(df)

        if result["has_violations"]:
            logger.warning(f"Guardrail violations detected: {result['violations']}")

            # Send alert
            send_alert(f"Experiment {FRAUD_DETECTION_EXPERIMENT.name} has guardrail violations")

            # Stop experiment if too many violations
            if monitor.should_stop_experiment(violation_count=3):
                logger.critical("Stopping experiment due to repeated violations")
                stop_experiment(FRAUD_DETECTION_EXPERIMENT.name)
                break

        time.sleep(300)  # Check every 5 minutes


def load_recent_experiment_data(minutes: int = 15) -> pd.DataFrame:
    """Load data from last N minutes."""
    # Implementation depends on data pipeline
    pass


def stop_experiment(experiment_name: str):
    """
    Emergency stop: route all traffic back to champion.
    """
    # Update routing configuration to 100% champion
    logger.info(f"Stopping experiment: {experiment_name}")


def send_alert(message: str):
    """Send alert to on-call team."""
    logger.info(f"Alert: {message}")

Expected: Guardrail violations detected within 5-15 minutes, automated experiment stop if critical thresholds breached (latency, errors), alerts sent to team.

On failure: Verify guardrail thresholds are realistic (not too tight), ensure monitoring loop is running continuously, check that stop_experiment() function actually updates routing, test alert delivery channels.

Step 6: Make Rollout Decision

Based on experiment results, decide whether to rollout challenger.

python
# ab_test/rollout_decision.py
import logging
from typing import Dict
from dataclasses import dataclass

logger = logging.getLogger(__name__)


@dataclass
class RolloutPlan:
    decision: str  # "full_rollout", "gradual_rollout", "rollback", "extend_test"
    reason: str
    next_steps: list
    risk_level: str


def make_rollout_decision(experiment_report: Dict) -> RolloutPlan:
    """
    Determine rollout plan based on experiment results.
    """
    primary = experiment_report["primary_metric"]

    # Check statistical significance
    if not primary["significant"]:
        if experiment_report["duration_days"] >= 14:
            return RolloutPlan(
                decision="keep_champion",
                reason="No significant difference after 14 days",
                next_steps=[
                    "Consider testing larger changes",
                    "Analyze per-segment performance",
                ],
                risk_level="low",
            )
        else:
            return RolloutPlan(
                decision="extend_test",
                reason="Not yet significant, need more data",
                next_steps=[
                    f"Continue test until {experiment_report['champion_samples'] * 2} samples",
                    "Monitor for seasonal effects",
                ],
                risk_level="low",
            )

    # Challenger significantly better
    if primary["challenger"] > primary["champion"]:
        relative_lift = primary["relative_lift"]

        if relative_lift > 0.20:
            # Large improvement: full rollout
            return RolloutPlan(
                decision="full_rollout",
                reason=f"Large significant improvement ({relative_lift:.1%} lift)",
                next_steps=[
                    "Update routing to 100% challenger",
                    "Archive champion model as v1_backup",
                    "Update monitoring dashboards",
                ],
                risk_level="low",
            )
        elif relative_lift > 0.05:
            # Moderate improvement: gradual rollout
            return RolloutPlan(
                decision="gradual_rollout",
                reason=f"Moderate improvement ({relative_lift:.1%} lift)",
                next_steps=[
                    "Week 1: Increase to 25% traffic",
                    "Week 2: Increase to 50% traffic",
                    "Week 3: Increase to 100% if stable",
                ],
                risk_level="medium",
            )
        else:
            # Small improvement: may not be worth complexity
            return RolloutPlan(
                decision="keep_champion",
                reason=f"Improvement too small to justify change ({relative_lift:.1%})",
                next_steps=[
                    "Consider cost/benefit of model complexity",
                    "Test on high-value user segments only",
                ],
                risk_level="low",
            )

    # Champion better: rollback
    else:
        return RolloutPlan(
            decision="rollback",
            reason="Challenger performed worse than champion",
            next_steps=[
                "Immediately route 100% to champion",
                "Investigate why challenger underperformed",
                "Retrain with more recent data",
            ],
            risk_level="critical",
        )


# Example usage
if __name__ == "__main__":
    from analysis import ExperimentAnalyzer
    from experiment_config import FRAUD_DETECTION_EXPERIMENT

    analyzer = ExperimentAnalyzer(FRAUD_DETECTION_EXPERIMENT)
    df = analyzer.load_experiment_data("2024-01-01", "2024-01-14")
    report = analyzer.generate_report(df)

    # Make decision
    plan = make_rollout_decision(report)

    print(f"Decision: {plan.decision}")
    print(f"Reason: {plan.reason}")
    print(f"Risk: {plan.risk_level}")
    print("\nNext steps:")
    for step in plan.next_steps:
        print(f"  - {step}")

Expected: Clear decision (full/gradual rollout, keep champion, or extend test) with justification and action items.

On failure: If decision unclear, perform subgroup analysis (by user segment, time of day, device type), check for interaction effects, review business context (e.g., is 2% lift worth engineering cost?), consult with stakeholders.

Validation

  • Traffic split matches configured percentages (within 1%)
  • Same user always assigned to same variant (consistency check)
  • Sample size calculation produces reasonable numbers (5-50k per variant)
  • Statistical tests produce p-values consistent with manual calculation
  • Guardrail violations trigger alerts within 5 minutes
  • Shadow deployment shows <5% prediction divergence between models
  • Experiment reports include confidence intervals
  • Rollout decision documented with justification

Common Pitfalls

  • Sample ratio mismatch (SRM): If observed traffic split differs from configured (e.g., 95/5 becomes 92/8), indicates assignment bug; check hash function uniformity
  • Peeking: Checking results before reaching sample size inflates Type I error; use sequential testing or wait for pre-determined end date
  • Novelty effect: Users respond differently to new model initially; run for 2+ weeks to see steady-state behavior
  • Carryover effects: Previous variant exposure affects current behavior; use new users or sufficient washout period
  • Multiple testing: Testing many metrics increases false positive risk; correct with Bonferroni or focus on single primary metric
  • Insufficient power: Small traffic allocation may require months to detect realistic effects; balance statistical power with risk tolerance
  • Ignoring segments: Aggregate lift may hide negative impact on important user segments; perform subgroup analysis
  • Attribution errors: Ensure outcome metrics correctly attributed to model predictions (not other system changes)

Related Skills

  • deploy-ml-model-serving - Model deployment infrastructure and versioning
  • monitor-model-drift - Ongoing performance monitoring post-rollout