This skill provides expertise in synchronizing database schemas between SQLite and Supabase Postgres. It should be used when the user asks about "schema sync", "database diff", "schema migration", "sync schemas", "schema drift", or similar database synchronization topics.
Schema Synchronization Between SQLite and Postgres
Master the patterns and techniques for keeping SQLite and Postgres schemas synchronized, detecting drift, and generating migrations automatically.
Core Synchronization Strategy
The Challenge
Many ETL pipelines use SQLite for local state management and Postgres (Supabase) for remote storage. Keeping these schemas synchronized is critical to avoid data inconsistencies.
Common Issues:
- •Schema drift (tables exist in one DB but not the other)
- •Column mismatches (different types, constraints)
- •Index differences
- •Migration state divergence
The Solution: Database-First Architecture
Define schema as source of truth in Python code:
# src/database/schema.py
SCHEMA_DEFINITION = {
"opengov_projects": {
"columns": [
("project_id", "TEXT PRIMARY KEY"),
("project_name", "TEXT NOT NULL"),
("status", "TEXT DEFAULT 'pending'"),
("extracted", "INTEGER DEFAULT 0"),
("created_at", "TIMESTAMP DEFAULT CURRENT_TIMESTAMP"),
("updated_at", "TIMESTAMP DEFAULT CURRENT_TIMESTAMP"),
],
"indexes": [
"CREATE INDEX IF NOT EXISTS idx_status ON opengov_projects(status)",
"CREATE INDEX IF NOT EXISTS idx_extracted ON opengov_projects(extracted)",
]
},
"opengov_opportunities": {
"columns": [
("id", "TEXT PRIMARY KEY"),
("project_id", "TEXT NOT NULL"),
("title", "TEXT"),
("description", "TEXT"),
("amount", "REAL"),
("deadline", "TEXT"),
("created_at", "TIMESTAMP DEFAULT CURRENT_TIMESTAMP"),
],
"indexes": [
"CREATE INDEX IF NOT EXISTS idx_project_id ON opengov_opportunities(project_id)",
],
"foreign_keys": [
"FOREIGN KEY (project_id) REFERENCES opengov_projects(project_id)"
]
}
}
Schema Diffing
Detecting Schema Differences
Compare schemas between SQLite and Postgres:
def compare_schemas(sqlite_conn, postgres_conn):
"""
Compare schemas between SQLite and Postgres.
Returns:
dict: Differences found
"""
differences = {
"missing_in_postgres": [],
"missing_in_sqlite": [],
"column_mismatches": [],
"index_differences": [],
}
# Get table lists
sqlite_tables = get_sqlite_tables(sqlite_conn)
postgres_tables = get_postgres_tables(postgres_conn)
# Find missing tables
differences["missing_in_postgres"] = set(sqlite_tables) - set(postgres_tables)
differences["missing_in_sqlite"] = set(postgres_tables) - set(sqlite_tables)
# Compare common tables
common_tables = set(sqlite_tables) & set(postgres_tables)
for table in common_tables:
column_diff = compare_table_columns(
table, sqlite_conn, postgres_conn
)
if column_diff:
differences["column_mismatches"].append({
"table": table,
"differences": column_diff
})
return differences
def get_sqlite_tables(conn):
"""Get list of tables in SQLite database."""
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'"
)
return [row[0] for row in cursor.fetchall()]
def get_postgres_tables(conn, schema='public'):
"""Get list of tables in Postgres schema."""
cursor = conn.execute(f"""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = '{schema}'
AND table_type = 'BASE TABLE'
""")
return [row[0] for row in cursor.fetchall()]
def compare_table_columns(table, sqlite_conn, postgres_conn):
"""Compare columns between SQLite and Postgres for a specific table."""
sqlite_cols = get_sqlite_columns(table, sqlite_conn)
postgres_cols = get_postgres_columns(table, postgres_conn)
differences = []
# Check for missing columns
sqlite_col_names = {col['name'] for col in sqlite_cols}
postgres_col_names = {col['name'] for col in postgres_cols}
missing_in_postgres = sqlite_col_names - postgres_col_names
missing_in_sqlite = postgres_col_names - sqlite_col_names
if missing_in_postgres:
differences.append({
"type": "missing_in_postgres",
"columns": list(missing_in_postgres)
})
if missing_in_sqlite:
differences.append({
"type": "missing_in_sqlite",
"columns": list(missing_in_sqlite)
})
# Check for type mismatches in common columns
common_cols = sqlite_col_names & postgres_col_names
for col_name in common_cols:
sqlite_type = next(c['type'] for c in sqlite_cols if c['name'] == col_name)
postgres_type = next(c['type'] for c in postgres_cols if c['name'] == col_name)
if not types_compatible(sqlite_type, postgres_type):
differences.append({
"type": "type_mismatch",
"column": col_name,
"sqlite_type": sqlite_type,
"postgres_type": postgres_type
})
return differences
Type Mapping Between SQLite and Postgres
SQLite and Postgres have different type systems:
SQLITE_TO_POSTGRES_TYPE_MAP = {
"TEXT": "TEXT",
"INTEGER": "INTEGER",
"REAL": "DOUBLE PRECISION",
"BLOB": "BYTEA",
"NUMERIC": "NUMERIC",
# SQLite date/time stored as TEXT
"TIMESTAMP": "TIMESTAMP WITH TIME ZONE",
"DATE": "DATE",
"TIME": "TIME",
}
POSTGRES_TO_SQLITE_TYPE_MAP = {
"TEXT": "TEXT",
"VARCHAR": "TEXT",
"CHAR": "TEXT",
"INTEGER": "INTEGER",
"BIGINT": "INTEGER",
"SMALLINT": "INTEGER",
"DOUBLE PRECISION": "REAL",
"REAL": "REAL",
"NUMERIC": "REAL",
"DECIMAL": "REAL",
"BYTEA": "BLOB",
"TIMESTAMP WITH TIME ZONE": "TEXT",
"TIMESTAMP WITHOUT TIME ZONE": "TEXT",
"DATE": "TEXT",
"TIME": "TEXT",
"BOOLEAN": "INTEGER", # SQLite uses 0/1
}
def types_compatible(sqlite_type, postgres_type):
"""Check if SQLite and Postgres types are compatible."""
expected_postgres_type = SQLITE_TO_POSTGRES_TYPE_MAP.get(
sqlite_type.upper().split('(')[0], # Remove size specifications
None
)
return expected_postgres_type == postgres_type.upper().split('(')[0]
Migration Generation
Auto-Generate Migrations from Schema Diff
Create Supabase migrations automatically:
def generate_migration_from_diff(differences, migration_name):
"""
Generate Supabase migration SQL from schema differences.
Args:
differences: Output from compare_schemas()
migration_name: Name for the migration
Returns:
str: Migration SQL
"""
sql_statements = []
# Create missing tables in Postgres
for table in differences["missing_in_postgres"]:
sql_statements.append(
generate_create_table_sql(table, "postgres")
)
# Handle column mismatches
for mismatch in differences["column_mismatches"]:
table = mismatch["table"]
for diff in mismatch["differences"]:
if diff["type"] == "missing_in_postgres":
for col in diff["columns"]:
sql_statements.append(
f"ALTER TABLE {table} ADD COLUMN {col} {get_column_definition(table, col)};"
)
elif diff["type"] == "type_mismatch":
col = diff["column"]
new_type = diff["postgres_type"]
sql_statements.append(
f"ALTER TABLE {table} ALTER COLUMN {col} TYPE {new_type};"
)
# Combine into migration
migration_sql = "-- Auto-generated migration from schema diff\\n"
migration_sql += f"-- Migration: {migration_name}\\n\\n"
migration_sql += "\\n".join(sql_statements)
return migration_sql
def create_supabase_migration(migration_sql, migration_name):
"""Create Supabase migration file."""
import subprocess
from datetime import datetime
# Create migration via Supabase CLI
result = subprocess.run(
["supabase", "migration", "new", migration_name],
capture_output=True,
text=True
)
if result.returncode != 0:
raise Exception(f"Failed to create migration: {result.stderr}")
# Extract migration file path from output
# Output format: "Created new migration at supabase/migrations/20240101000000_migration_name.sql"
migration_file = extract_migration_path(result.stdout)
# Write SQL to migration file
with open(migration_file, 'w') as f:
f.write(migration_sql)
return migration_file
Synchronization Strategies
Strategy 1: SQLite to Postgres (One-Way)
Use SQLite as source of truth, sync to Postgres:
def sync_sqlite_to_postgres(sqlite_db, postgres_conn):
"""Sync SQLite schema to Postgres."""
differences = compare_schemas(sqlite_db, postgres_conn)
if not has_differences(differences):
print("Schemas are in sync")
return
# Generate migration
migration_sql = generate_migration_from_diff(
differences,
"sync_from_sqlite"
)
# Create Supabase migration
migration_file = create_supabase_migration(
migration_sql,
"sync_from_sqlite"
)
print(f"Created migration: {migration_file}")
# Optionally apply migration
if confirm("Apply migration now?"):
apply_migration(migration_file)
Strategy 2: Postgres to SQLite (Reverse Sync)
Pull Postgres schema to SQLite:
def sync_postgres_to_sqlite(postgres_conn, sqlite_db):
"""Sync Postgres schema to SQLite."""
differences = compare_schemas(sqlite_db, postgres_conn)
# Generate SQLite DDL
for table in differences["missing_in_sqlite"]:
create_table_sql = generate_create_table_sql(table, "sqlite")
sqlite_db.execute(create_table_sql)
sqlite_db.commit()
Strategy 3: Bidirectional Sync with Conflict Resolution
Handle conflicts when both schemas have changes:
def sync_bidirectional(sqlite_db, postgres_conn, conflict_resolution="postgres_wins"):
"""
Bidirectional sync with conflict resolution.
Args:
conflict_resolution: "postgres_wins", "sqlite_wins", or "merge"
"""
differences = compare_schemas(sqlite_db, postgres_conn)
if conflict_resolution == "postgres_wins":
sync_postgres_to_sqlite(postgres_conn, sqlite_db)
elif conflict_resolution == "sqlite_wins":
sync_sqlite_to_postgres(sqlite_db, postgres_conn)
elif conflict_resolution == "merge":
merge_schemas(sqlite_db, postgres_conn, differences)
else:
raise ValueError(f"Unknown resolution strategy: {conflict_resolution}")
Schema Versioning
Track Schema Version in Database
Store schema version to detect drift:
def get_schema_version(conn, db_type="sqlite"):
"""Get current schema version from database."""
try:
if db_type == "sqlite":
cursor = conn.execute("SELECT version FROM schema_version ORDER BY applied_at DESC LIMIT 1")
else: # postgres
cursor = conn.execute("SELECT version FROM public.schema_version ORDER BY applied_at DESC LIMIT 1")
row = cursor.fetchone()
return row[0] if row else None
except:
# Table doesn't exist yet
return None
def set_schema_version(conn, version, db_type="sqlite"):
"""Record schema version in database."""
if db_type == "sqlite":
conn.execute("""
CREATE TABLE IF NOT EXISTS schema_version (
version TEXT NOT NULL,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("INSERT INTO schema_version (version) VALUES (?)", (version,))
else: # postgres
conn.execute("""
CREATE TABLE IF NOT EXISTS public.schema_version (
version TEXT NOT NULL,
applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
)
""")
conn.execute("INSERT INTO public.schema_version (version) VALUES (%s)", (version,))
conn.commit()
Best Practices
- •Use Python as source of truth - Define schema in code, generate both SQLite and Postgres DDL
- •Version everything - Track schema versions in both databases
- •Test migrations locally first - Always test on local Supabase before remote
- •Automate drift detection - Run
garden db diffin CI/CD pipelines - •Handle data migration - Schema changes may require data transformation
- •Use transactions - All schema changes should be atomic
- •Backup before sync - Always backup before applying schema changes
Common Patterns
Continuous Schema Validation
Run in CI/CD to catch drift early:
#!/bin/bash # .github/workflows/schema-validation.yml # Check for schema drift garden db diff # Fail if drift detected if [ $? -ne 0 ]; then echo "Schema drift detected! Run 'garden db sync' locally." exit 1 fi
Pre-Deployment Schema Sync
Before deploying new code, ensure schemas match:
def pre_deployment_check():
"""Run before deploying to ensure schemas are synced."""
print("Checking schema synchronization...")
differences = check_schema_drift()
if differences:
print("⚠️ Schema drift detected!")
print_differences(differences)
if not confirm("Sync schemas now?"):
raise Exception("Deployment aborted - schema not synced")
sync_schemas()
print("✅ Schemas are synchronized")
Tools Integration
Supabase CLI Integration
Use Supabase CLI for migration management:
# Generate new migration supabase migration new add_column_to_projects # Apply migrations locally supabase db reset # Push to remote supabase db push
Garden CLI Integration
Use Garden commands for schema operations:
# Check for differences garden db diff # Sync schemas (SQLite → Postgres) garden db sync # Audit schema consistency garden db audit