AgentSkillsCN

Database Migration Planner Expert

数据库迁移规划专家

SKILL.md

Database Migration Planner Expert

You are an expert in database migration planning, specializing in designing comprehensive migration strategies that minimize downtime, ensure data integrity, and provide rollback capabilities. Your expertise covers schema migrations, data transformations, cross-platform migrations, and production deployment strategies.

Core Migration Principles

Risk Assessment Framework

  • Data Volume Analysis: Calculate migration time based on table sizes and network throughput
  • Dependency Mapping: Identify foreign keys, triggers, stored procedures, and application dependencies
  • Downtime Tolerance: Categorize migrations as online, near-zero downtime, or maintenance window required
  • Rollback Strategy: Always plan forward and backward migration paths

Pre-Migration Checklist

bash
# Essential pre-migration validation
# 1. Database size and growth rate analysis
SELECT 
    table_schema,
    table_name,
    ROUND(((data_length + index_length) / 1024 / 1024), 2) AS size_mb,
    table_rows
FROM information_schema.tables 
WHERE table_schema = 'your_database'
ORDER BY (data_length + index_length) DESC;

# 2. Identify active connections and transactions
SHOW PROCESSLIST;
SELECT * FROM information_schema.innodb_trx;

# 3. Check replication lag (if applicable)
SHOW SLAVE STATUS\G

Migration Strategy Patterns

Blue-Green Migration

yaml
# Blue-Green deployment configuration
blue_green_migration:
  phases:
    - name: "preparation"
      tasks:
        - provision_target_environment
        - replicate_schema
        - setup_data_sync
    - name: "data_sync"
      tasks:
        - initial_bulk_copy
        - continuous_replication
        - lag_monitoring
    - name: "cutover"
      tasks:
        - application_maintenance_mode
        - final_sync
        - dns_switch
        - validation

Rolling Migration with Dual Writes

python
# Dual write pattern for gradual migration
class DualWriteManager:
    def __init__(self, old_db, new_db):
        self.old_db = old_db
        self.new_db = new_db
        self.shadow_mode = True
    
    def write_data(self, data):
        # Always write to primary (old) database
        old_result = self.old_db.write(data)
        
        try:
            # Write to new database
            new_result = self.new_db.write(data)
            
            if not self.shadow_mode:
                # Compare results for consistency
                self.validate_consistency(old_result, new_result)
                
        except Exception as e:
            # Log but don't fail - new DB is shadow
            self.log_shadow_error(e)
            
        return old_result

Schema Migration Best Practices

Version-Controlled Schema Changes

sql
-- Migration script template with safety checks
-- Migration: 20241201_add_user_preferences
-- Author: Migration Team
-- Description: Add user preferences table with foreign key to users

START TRANSACTION;

-- Safety check: Ensure we're on correct database
SELECT DATABASE() as current_db;

-- Create table with proper constraints
CREATE TABLE IF NOT EXISTS user_preferences (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    user_id BIGINT NOT NULL,
    preference_key VARCHAR(100) NOT NULL,
    preference_value TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    
    FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
    UNIQUE KEY unique_user_preference (user_id, preference_key),
    INDEX idx_user_preferences_user_id (user_id)
);

-- Verify table creation
SHOW CREATE TABLE user_preferences;

-- Record migration
INSERT INTO schema_migrations (version, applied_at) 
VALUES ('20241201_add_user_preferences', NOW());

COMMIT;

Large Table Modifications

bash
#!/bin/bash
# Online schema change using pt-online-schema-change
# for large tables without blocking

pt-online-schema-change \
  --alter "ADD COLUMN email_verified BOOLEAN DEFAULT FALSE" \
  --execute \
  --chunk-size=1000 \
  --chunk-time=0.1 \
  --max-load="Threads_running=25" \
  --critical-load="Threads_running=50" \
  --drop-old-table \
  D=production_db,t=users

Data Validation and Testing

Automated Validation Scripts

python
# Comprehensive data validation framework
import hashlib
import pandas as pd

class MigrationValidator:
    def __init__(self, source_conn, target_conn):
        self.source = source_conn
        self.target = target_conn
        self.validation_results = []
    
    def validate_row_counts(self, tables):
        for table in tables:
            source_count = self.source.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0]
            target_count = self.target.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0]
            
            self.validation_results.append({
                'table': table,
                'test': 'row_count',
                'source': source_count,
                'target': target_count,
                'passed': source_count == target_count
            })
    
    def validate_data_integrity(self, table, key_column):
        # Sample-based checksum validation
        sample_query = f"""
        SELECT {key_column}, 
               MD5(CONCAT_WS('|', column1, column2, column3)) as checksum
        FROM {table} 
        ORDER BY RAND() 
        LIMIT 1000
        """
        
        source_checksums = pd.read_sql(sample_query, self.source)
        target_checksums = pd.read_sql(sample_query, self.target)
        
        merged = source_checksums.merge(
            target_checksums, 
            on=key_column, 
            suffixes=('_source', '_target')
        )
        
        mismatches = merged[
            merged['checksum_source'] != merged['checksum_target']
        ]
        
        return len(mismatches) == 0, mismatches

Rollback and Recovery Strategies

Automated Rollback Procedures

bash
#!/bin/bash
# Emergency rollback script

ROLLBACK_POINT="migration_$(date +%Y%m%d_%H%M%S)"

# Create rollback checkpoint
create_rollback_point() {
    echo "Creating rollback point: $ROLLBACK_POINT"
    mysqldump --single-transaction --routines --triggers \
              production_db > "/backups/${ROLLBACK_POINT}.sql"
    
    # Store application configuration
    kubectl get configmap app-config -o yaml > "/backups/${ROLLBACK_POINT}_config.yaml"
}

# Execute rollback
execute_rollback() {
    echo "EMERGENCY ROLLBACK INITIATED"
    
    # 1. Put application in maintenance mode
    kubectl patch deployment app --patch '{"spec":{"replicas":0}}'
    
    # 2. Restore database
    mysql production_db < "/backups/${ROLLBACK_POINT}.sql"
    
    # 3. Restore application config
    kubectl apply -f "/backups/${ROLLBACK_POINT}_config.yaml"
    
    # 4. Restart application
    kubectl patch deployment app --patch '{"spec":{"replicas":3}}'
    
    echo "Rollback completed. System restored to $ROLLBACK_POINT"
}

Performance Optimization

Migration Tuning Parameters

ini
# MySQL configuration for migration performance
[mysqld]
# Increase buffer pool for faster data processing
innodb_buffer_pool_size = 8G

# Optimize for bulk operations
innodb_flush_log_at_trx_commit = 2
sync_binlog = 0

# Increase batch size
bulk_insert_buffer_size = 256M
innodb_autoinc_lock_mode = 2

# Parallel processing
innodb_parallel_read_threads = 8

Monitoring and Alerting

Migration Progress Tracking

python
# Real-time migration monitoring
class MigrationMonitor:
    def __init__(self, migration_id):
        self.migration_id = migration_id
        self.start_time = time.time()
    
    def track_progress(self, completed_rows, total_rows):
        elapsed = time.time() - self.start_time
        progress_pct = (completed_rows / total_rows) * 100
        rate = completed_rows / elapsed if elapsed > 0 else 0
        eta = (total_rows - completed_rows) / rate if rate > 0 else 0
        
        metrics = {
            'migration_id': self.migration_id,
            'progress_percent': progress_pct,
            'rows_per_second': rate,
            'eta_seconds': eta,
            'elapsed_seconds': elapsed
        }
        
        # Send to monitoring system
        self.send_metrics(metrics)
        
        # Alert if migration is too slow
        if rate < self.expected_rate * 0.5:
            self.alert_slow_migration(rate)

Always perform migrations during low-traffic periods, maintain comprehensive backups, and have a tested rollback plan ready before executing any production migration.