AgentSkillsCN

Schema Sync

Schema 同步

SKILL.md

This skill provides expertise in synchronizing database schemas between SQLite and Supabase Postgres. It should be used when the user asks about "schema sync", "database diff", "schema migration", "sync schemas", "schema drift", or similar database synchronization topics.

Schema Synchronization Between SQLite and Postgres

Master the patterns and techniques for keeping SQLite and Postgres schemas synchronized, detecting drift, and generating migrations automatically.

Core Synchronization Strategy

The Challenge

Many ETL pipelines use SQLite for local state management and Postgres (Supabase) for remote storage. Keeping these schemas synchronized is critical to avoid data inconsistencies.

Common Issues:

  • Schema drift (tables exist in one DB but not the other)
  • Column mismatches (different types, constraints)
  • Index differences
  • Migration state divergence

The Solution: Database-First Architecture

Define schema as source of truth in Python code:

python
# src/database/schema.py
SCHEMA_DEFINITION = {
    "opengov_projects": {
        "columns": [
            ("project_id", "TEXT PRIMARY KEY"),
            ("project_name", "TEXT NOT NULL"),
            ("status", "TEXT DEFAULT 'pending'"),
            ("extracted", "INTEGER DEFAULT 0"),
            ("created_at", "TIMESTAMP DEFAULT CURRENT_TIMESTAMP"),
            ("updated_at", "TIMESTAMP DEFAULT CURRENT_TIMESTAMP"),
        ],
        "indexes": [
            "CREATE INDEX IF NOT EXISTS idx_status ON opengov_projects(status)",
            "CREATE INDEX IF NOT EXISTS idx_extracted ON opengov_projects(extracted)",
        ]
    },
    "opengov_opportunities": {
        "columns": [
            ("id", "TEXT PRIMARY KEY"),
            ("project_id", "TEXT NOT NULL"),
            ("title", "TEXT"),
            ("description", "TEXT"),
            ("amount", "REAL"),
            ("deadline", "TEXT"),
            ("created_at", "TIMESTAMP DEFAULT CURRENT_TIMESTAMP"),
        ],
        "indexes": [
            "CREATE INDEX IF NOT EXISTS idx_project_id ON opengov_opportunities(project_id)",
        ],
        "foreign_keys": [
            "FOREIGN KEY (project_id) REFERENCES opengov_projects(project_id)"
        ]
    }
}

Schema Diffing

Detecting Schema Differences

Compare schemas between SQLite and Postgres:

python
def compare_schemas(sqlite_conn, postgres_conn):
    """
    Compare schemas between SQLite and Postgres.

    Returns:
        dict: Differences found
    """
    differences = {
        "missing_in_postgres": [],
        "missing_in_sqlite": [],
        "column_mismatches": [],
        "index_differences": [],
    }

    # Get table lists
    sqlite_tables = get_sqlite_tables(sqlite_conn)
    postgres_tables = get_postgres_tables(postgres_conn)

    # Find missing tables
    differences["missing_in_postgres"] = set(sqlite_tables) - set(postgres_tables)
    differences["missing_in_sqlite"] = set(postgres_tables) - set(sqlite_tables)

    # Compare common tables
    common_tables = set(sqlite_tables) & set(postgres_tables)
    for table in common_tables:
        column_diff = compare_table_columns(
            table, sqlite_conn, postgres_conn
        )
        if column_diff:
            differences["column_mismatches"].append({
                "table": table,
                "differences": column_diff
            })

    return differences

def get_sqlite_tables(conn):
    """Get list of tables in SQLite database."""
    cursor = conn.execute(
        "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'"
    )
    return [row[0] for row in cursor.fetchall()]

def get_postgres_tables(conn, schema='public'):
    """Get list of tables in Postgres schema."""
    cursor = conn.execute(f"""
        SELECT table_name
        FROM information_schema.tables
        WHERE table_schema = '{schema}'
        AND table_type = 'BASE TABLE'
    """)
    return [row[0] for row in cursor.fetchall()]

def compare_table_columns(table, sqlite_conn, postgres_conn):
    """Compare columns between SQLite and Postgres for a specific table."""
    sqlite_cols = get_sqlite_columns(table, sqlite_conn)
    postgres_cols = get_postgres_columns(table, postgres_conn)

    differences = []

    # Check for missing columns
    sqlite_col_names = {col['name'] for col in sqlite_cols}
    postgres_col_names = {col['name'] for col in postgres_cols}

    missing_in_postgres = sqlite_col_names - postgres_col_names
    missing_in_sqlite = postgres_col_names - sqlite_col_names

    if missing_in_postgres:
        differences.append({
            "type": "missing_in_postgres",
            "columns": list(missing_in_postgres)
        })

    if missing_in_sqlite:
        differences.append({
            "type": "missing_in_sqlite",
            "columns": list(missing_in_sqlite)
        })

    # Check for type mismatches in common columns
    common_cols = sqlite_col_names & postgres_col_names
    for col_name in common_cols:
        sqlite_type = next(c['type'] for c in sqlite_cols if c['name'] == col_name)
        postgres_type = next(c['type'] for c in postgres_cols if c['name'] == col_name)

        if not types_compatible(sqlite_type, postgres_type):
            differences.append({
                "type": "type_mismatch",
                "column": col_name,
                "sqlite_type": sqlite_type,
                "postgres_type": postgres_type
            })

    return differences

Type Mapping Between SQLite and Postgres

SQLite and Postgres have different type systems:

python
SQLITE_TO_POSTGRES_TYPE_MAP = {
    "TEXT": "TEXT",
    "INTEGER": "INTEGER",
    "REAL": "DOUBLE PRECISION",
    "BLOB": "BYTEA",
    "NUMERIC": "NUMERIC",
    # SQLite date/time stored as TEXT
    "TIMESTAMP": "TIMESTAMP WITH TIME ZONE",
    "DATE": "DATE",
    "TIME": "TIME",
}

POSTGRES_TO_SQLITE_TYPE_MAP = {
    "TEXT": "TEXT",
    "VARCHAR": "TEXT",
    "CHAR": "TEXT",
    "INTEGER": "INTEGER",
    "BIGINT": "INTEGER",
    "SMALLINT": "INTEGER",
    "DOUBLE PRECISION": "REAL",
    "REAL": "REAL",
    "NUMERIC": "REAL",
    "DECIMAL": "REAL",
    "BYTEA": "BLOB",
    "TIMESTAMP WITH TIME ZONE": "TEXT",
    "TIMESTAMP WITHOUT TIME ZONE": "TEXT",
    "DATE": "TEXT",
    "TIME": "TEXT",
    "BOOLEAN": "INTEGER",  # SQLite uses 0/1
}

def types_compatible(sqlite_type, postgres_type):
    """Check if SQLite and Postgres types are compatible."""
    expected_postgres_type = SQLITE_TO_POSTGRES_TYPE_MAP.get(
        sqlite_type.upper().split('(')[0],  # Remove size specifications
        None
    )

    return expected_postgres_type == postgres_type.upper().split('(')[0]

Migration Generation

Auto-Generate Migrations from Schema Diff

Create Supabase migrations automatically:

python
def generate_migration_from_diff(differences, migration_name):
    """
    Generate Supabase migration SQL from schema differences.

    Args:
        differences: Output from compare_schemas()
        migration_name: Name for the migration

    Returns:
        str: Migration SQL
    """
    sql_statements = []

    # Create missing tables in Postgres
    for table in differences["missing_in_postgres"]:
        sql_statements.append(
            generate_create_table_sql(table, "postgres")
        )

    # Handle column mismatches
    for mismatch in differences["column_mismatches"]:
        table = mismatch["table"]
        for diff in mismatch["differences"]:
            if diff["type"] == "missing_in_postgres":
                for col in diff["columns"]:
                    sql_statements.append(
                        f"ALTER TABLE {table} ADD COLUMN {col} {get_column_definition(table, col)};"
                    )
            elif diff["type"] == "type_mismatch":
                col = diff["column"]
                new_type = diff["postgres_type"]
                sql_statements.append(
                    f"ALTER TABLE {table} ALTER COLUMN {col} TYPE {new_type};"
                )

    # Combine into migration
    migration_sql = "-- Auto-generated migration from schema diff\\n"
    migration_sql += f"-- Migration: {migration_name}\\n\\n"
    migration_sql += "\\n".join(sql_statements)

    return migration_sql

def create_supabase_migration(migration_sql, migration_name):
    """Create Supabase migration file."""
    import subprocess
    from datetime import datetime

    # Create migration via Supabase CLI
    result = subprocess.run(
        ["supabase", "migration", "new", migration_name],
        capture_output=True,
        text=True
    )

    if result.returncode != 0:
        raise Exception(f"Failed to create migration: {result.stderr}")

    # Extract migration file path from output
    # Output format: "Created new migration at supabase/migrations/20240101000000_migration_name.sql"
    migration_file = extract_migration_path(result.stdout)

    # Write SQL to migration file
    with open(migration_file, 'w') as f:
        f.write(migration_sql)

    return migration_file

Synchronization Strategies

Strategy 1: SQLite to Postgres (One-Way)

Use SQLite as source of truth, sync to Postgres:

python
def sync_sqlite_to_postgres(sqlite_db, postgres_conn):
    """Sync SQLite schema to Postgres."""
    differences = compare_schemas(sqlite_db, postgres_conn)

    if not has_differences(differences):
        print("Schemas are in sync")
        return

    # Generate migration
    migration_sql = generate_migration_from_diff(
        differences,
        "sync_from_sqlite"
    )

    # Create Supabase migration
    migration_file = create_supabase_migration(
        migration_sql,
        "sync_from_sqlite"
    )

    print(f"Created migration: {migration_file}")

    # Optionally apply migration
    if confirm("Apply migration now?"):
        apply_migration(migration_file)

Strategy 2: Postgres to SQLite (Reverse Sync)

Pull Postgres schema to SQLite:

python
def sync_postgres_to_sqlite(postgres_conn, sqlite_db):
    """Sync Postgres schema to SQLite."""
    differences = compare_schemas(sqlite_db, postgres_conn)

    # Generate SQLite DDL
    for table in differences["missing_in_sqlite"]:
        create_table_sql = generate_create_table_sql(table, "sqlite")
        sqlite_db.execute(create_table_sql)

    sqlite_db.commit()

Strategy 3: Bidirectional Sync with Conflict Resolution

Handle conflicts when both schemas have changes:

python
def sync_bidirectional(sqlite_db, postgres_conn, conflict_resolution="postgres_wins"):
    """
    Bidirectional sync with conflict resolution.

    Args:
        conflict_resolution: "postgres_wins", "sqlite_wins", or "merge"
    """
    differences = compare_schemas(sqlite_db, postgres_conn)

    if conflict_resolution == "postgres_wins":
        sync_postgres_to_sqlite(postgres_conn, sqlite_db)
    elif conflict_resolution == "sqlite_wins":
        sync_sqlite_to_postgres(sqlite_db, postgres_conn)
    elif conflict_resolution == "merge":
        merge_schemas(sqlite_db, postgres_conn, differences)
    else:
        raise ValueError(f"Unknown resolution strategy: {conflict_resolution}")

Schema Versioning

Track Schema Version in Database

Store schema version to detect drift:

python
def get_schema_version(conn, db_type="sqlite"):
    """Get current schema version from database."""
    try:
        if db_type == "sqlite":
            cursor = conn.execute("SELECT version FROM schema_version ORDER BY applied_at DESC LIMIT 1")
        else:  # postgres
            cursor = conn.execute("SELECT version FROM public.schema_version ORDER BY applied_at DESC LIMIT 1")

        row = cursor.fetchone()
        return row[0] if row else None
    except:
        # Table doesn't exist yet
        return None

def set_schema_version(conn, version, db_type="sqlite"):
    """Record schema version in database."""
    if db_type == "sqlite":
        conn.execute("""
            CREATE TABLE IF NOT EXISTS schema_version (
                version TEXT NOT NULL,
                applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
        conn.execute("INSERT INTO schema_version (version) VALUES (?)", (version,))
    else:  # postgres
        conn.execute("""
            CREATE TABLE IF NOT EXISTS public.schema_version (
                version TEXT NOT NULL,
                applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
            )
        """)
        conn.execute("INSERT INTO public.schema_version (version) VALUES (%s)", (version,))

    conn.commit()

Best Practices

  1. Use Python as source of truth - Define schema in code, generate both SQLite and Postgres DDL
  2. Version everything - Track schema versions in both databases
  3. Test migrations locally first - Always test on local Supabase before remote
  4. Automate drift detection - Run garden db diff in CI/CD pipelines
  5. Handle data migration - Schema changes may require data transformation
  6. Use transactions - All schema changes should be atomic
  7. Backup before sync - Always backup before applying schema changes

Common Patterns

Continuous Schema Validation

Run in CI/CD to catch drift early:

bash
#!/bin/bash
# .github/workflows/schema-validation.yml

# Check for schema drift
garden db diff

# Fail if drift detected
if [ $? -ne 0 ]; then
  echo "Schema drift detected! Run 'garden db sync' locally."
  exit 1
fi

Pre-Deployment Schema Sync

Before deploying new code, ensure schemas match:

python
def pre_deployment_check():
    """Run before deploying to ensure schemas are synced."""
    print("Checking schema synchronization...")

    differences = check_schema_drift()

    if differences:
        print("⚠️ Schema drift detected!")
        print_differences(differences)

        if not confirm("Sync schemas now?"):
            raise Exception("Deployment aborted - schema not synced")

        sync_schemas()

    print("✅ Schemas are synchronized")

Tools Integration

Supabase CLI Integration

Use Supabase CLI for migration management:

bash
# Generate new migration
supabase migration new add_column_to_projects

# Apply migrations locally
supabase db reset

# Push to remote
supabase db push

Garden CLI Integration

Use Garden commands for schema operations:

bash
# Check for differences
garden db diff

# Sync schemas (SQLite → Postgres)
garden db sync

# Audit schema consistency
garden db audit