This skill provides expertise in auditing system wiring, validating component connections, and ensuring proper configuration across Supabase, Airflow, and SQLite. It should be used when the user asks about "audit system", "validate wiring", "check connections", "verify configuration", or similar validation topics.
System Auditing and Wiring Validation
Master the techniques for validating that all system components are properly connected, configured, and functioning together correctly.
Core Audit Areas
1. Database Connectivity
Verify all database connections work:
def audit_database_connectivity():
"""Audit database connectivity for all configured databases."""
results = {
"sqlite": audit_sqlite_connection(),
"supabase": audit_supabase_connection(),
}
return results
def audit_sqlite_connection():
"""Test SQLite database connectivity."""
try:
db_path = os.getenv('SQLITE_DB_PATH', 'data/db/opengov_state.db')
if not os.path.exists(db_path):
return {
"status": "error",
"message": f"Database file not found: {db_path}"
}
conn = sqlite3.connect(db_path)
conn.execute('PRAGMA journal_mode') # Test query
conn.close()
return {
"status": "healthy",
"path": db_path,
"message": "SQLite connection successful"
}
except Exception as e:
return {
"status": "error",
"message": f"SQLite connection failed: {e}"
}
def audit_supabase_connection():
"""Test Supabase connectivity."""
try:
supabase_url = os.getenv('SUPABASE_URL')
supabase_key = os.getenv('SUPABASE_ANON_KEY')
if not supabase_url or not supabase_key:
return {
"status": "error",
"message": "SUPABASE_URL or SUPABASE_ANON_KEY not configured"
}
# Test connection
import requests
response = requests.get(
f"{supabase_url}/rest/v1/",
headers={"apikey": supabase_key},
timeout=10
)
if response.status_code == 200:
return {
"status": "healthy",
"url": supabase_url,
"message": "Supabase connection successful"
}
else:
return {
"status": "error",
"message": f"Supabase returned status {response.status_code}"
}
except Exception as e:
return {
"status": "error",
"message": f"Supabase connection failed: {e}"
}
2. Environment Variable Validation
Validate all required environment variables are set:
REQUIRED_ENV_VARS = {
"core": [
"DB_STORAGE_TARGET",
],
"supabase": [
"SUPABASE_URL",
"SUPABASE_ANON_KEY",
"SUPABASE_SERVICE_ROLE_KEY",
],
"airflow": [
"AIRFLOW_HOME",
"AIRFLOW__CORE__DAGS_FOLDER",
],
"sqlite": [
"SQLITE_DB_PATH",
],
}
def audit_environment_variables():
"""Audit environment variable configuration."""
results = {
"missing": [],
"present": [],
"warnings": [],
}
# Check core variables
for var in REQUIRED_ENV_VARS["core"]:
if not os.getenv(var):
results["missing"].append({
"variable": var,
"category": "core",
"severity": "critical"
})
else:
results["present"].append(var)
# Check Supabase variables if Supabase is enabled
if os.getenv("DB_STORAGE_TARGET") == "supabase":
for var in REQUIRED_ENV_VARS["supabase"]:
if not os.getenv(var):
results["missing"].append({
"variable": var,
"category": "supabase",
"severity": "critical"
})
else:
results["present"].append(var)
# Check Airflow variables if Airflow detected
airflow_home = os.getenv("AIRFLOW_HOME")
if airflow_home and os.path.exists(airflow_home):
for var in REQUIRED_ENV_VARS["airflow"]:
if not os.getenv(var):
results["warnings"].append({
"variable": var,
"category": "airflow",
"severity": "warning",
"message": "Airflow detected but variable not set"
})
return results
3. Airflow DAG Wiring
Validate that Airflow DAGs are properly wired to databases:
def audit_airflow_dag_wiring():
"""Audit Airflow DAG database connections."""
results = {
"dags_checked": 0,
"issues": [],
"healthy": [],
}
# Find DAG files
dags_folder = os.getenv("AIRFLOW__CORE__DAGS_FOLDER", "dags")
if not os.path.exists(dags_folder):
results["issues"].append({
"type": "missing_dags_folder",
"message": f"DAGs folder not found: {dags_folder}"
})
return results
# Check each DAG file
for dag_file in Path(dags_folder).glob("*.py"):
results["dags_checked"] += 1
issues = check_dag_file_wiring(dag_file)
if issues:
results["issues"].extend(issues)
else:
results["healthy"].append(str(dag_file))
return results
def check_dag_file_wiring(dag_file):
"""Check a single DAG file for proper wiring."""
issues = []
try:
content = dag_file.read_text()
# Check for DBAdapter import
if "DBAdapter" in content or "db_adapter" in content:
# Check if DB_STORAGE_TARGET is referenced
if "DB_STORAGE_TARGET" not in content:
issues.append({
"file": str(dag_file),
"type": "missing_db_config_check",
"message": "DAG uses DBAdapter but doesn't check DB_STORAGE_TARGET"
})
# Check for proper error handling
if "try:" not in content or "except" not in content:
issues.append({
"file": str(dag_file),
"type": "missing_error_handling",
"message": "DAG lacks try/except error handling"
})
# Check for hardcoded database paths
if "data/db/" in content and "os.getenv" not in content:
issues.append({
"file": str(dag_file),
"type": "hardcoded_db_path",
"message": "DAG contains hardcoded database path"
})
except Exception as e:
issues.append({
"file": str(dag_file),
"type": "parse_error",
"message": f"Failed to parse DAG file: {e}"
})
return issues
4. Schema Consistency
Verify schemas are synchronized:
def audit_schema_consistency():
"""Audit schema consistency between SQLite and Postgres."""
results = {
"status": "unknown",
"differences": [],
"recommendations": [],
}
try:
# Connect to both databases
sqlite_conn = get_sqlite_connection()
postgres_conn = get_postgres_connection()
# Compare schemas
differences = compare_schemas(sqlite_conn, postgres_conn)
if not differences["missing_in_postgres"] and \
not differences["missing_in_sqlite"] and \
not differences["column_mismatches"]:
results["status"] = "synced"
results["message"] = "Schemas are synchronized"
else:
results["status"] = "drift_detected"
results["differences"] = differences
# Generate recommendations
if differences["missing_in_postgres"]:
results["recommendations"].append(
"Run 'garden db sync' to sync missing tables to Postgres"
)
if differences["column_mismatches"]:
results["recommendations"].append(
"Review column mismatches and create migration"
)
except Exception as e:
results["status"] = "error"
results["message"] = f"Schema audit failed: {e}"
return results
5. File Structure Validation
Verify expected project structure exists:
EXPECTED_STRUCTURE = {
"directories": [
"data/db",
"data/exports",
"supabase/migrations",
"dags",
"src/database",
"logs",
],
"files": [
".env",
"config/garden.yml",
"src/database/schema.py",
],
}
def audit_file_structure():
"""Audit project file structure."""
results = {
"missing_directories": [],
"missing_files": [],
"present": [],
}
# Check directories
for directory in EXPECTED_STRUCTURE["directories"]:
if os.path.exists(directory):
results["present"].append(directory)
else:
results["missing_directories"].append({
"path": directory,
"severity": "warning",
"message": f"Expected directory not found: {directory}"
})
# Check files
for file_path in EXPECTED_STRUCTURE["files"]:
if os.path.exists(file_path):
results["present"].append(file_path)
else:
results["missing_files"].append({
"path": file_path,
"severity": "warning" if file_path.startswith("config/") else "error",
"message": f"Expected file not found: {file_path}"
})
return results
Comprehensive System Audit
Full System Audit Report
Run all audits and generate comprehensive report:
def run_full_system_audit():
"""Run comprehensive system audit."""
report = {
"timestamp": datetime.now().isoformat(),
"overall_status": "unknown",
"audits": {
"database_connectivity": audit_database_connectivity(),
"environment_variables": audit_environment_variables(),
"airflow_dag_wiring": audit_airflow_dag_wiring(),
"schema_consistency": audit_schema_consistency(),
"file_structure": audit_file_structure(),
},
"critical_issues": [],
"warnings": [],
"recommendations": [],
}
# Collect issues across all audits
for audit_name, audit_result in report["audits"].items():
if isinstance(audit_result, dict):
# Extract issues
if "issues" in audit_result:
for issue in audit_result["issues"]:
if issue.get("severity") == "critical":
report["critical_issues"].append({
"audit": audit_name,
**issue
})
else:
report["warnings"].append({
"audit": audit_name,
**issue
})
# Extract recommendations
if "recommendations" in audit_result:
report["recommendations"].extend(audit_result["recommendations"])
# Determine overall status
if report["critical_issues"]:
report["overall_status"] = "unhealthy"
elif report["warnings"]:
report["overall_status"] = "degraded"
else:
report["overall_status"] = "healthy"
return report
def format_audit_report(report):
"""Format audit report for display."""
lines = []
lines.append("=" * 60)
lines.append("SYSTEM AUDIT REPORT")
lines.append("=" * 60)
lines.append(f"Timestamp: {report['timestamp']}")
lines.append(f"Overall Status: {report['overall_status'].upper()}")
lines.append("")
if report["critical_issues"]:
lines.append("🔴 CRITICAL ISSUES:")
for issue in report["critical_issues"]:
lines.append(f" - [{issue['audit']}] {issue.get('message', issue.get('type'))}")
lines.append("")
if report["warnings"]:
lines.append("⚠️ WARNINGS:")
for warning in report["warnings"]:
lines.append(f" - [{warning['audit']}] {warning.get('message', warning.get('type'))}")
lines.append("")
if report["recommendations"]:
lines.append("💡 RECOMMENDATIONS:")
for rec in report["recommendations"]:
lines.append(f" - {rec}")
lines.append("")
lines.append("=" * 60)
return "\n".join(lines)
Automated Remediation
Auto-Fix Common Issues
Attempt to automatically fix common configuration issues:
def auto_remediate_issues(audit_report):
"""Attempt automatic remediation of audit issues."""
remediation_results = {
"attempted": [],
"successful": [],
"failed": [],
}
for issue in audit_report["critical_issues"]:
remediation_results["attempted"].append(issue)
try:
if issue["type"] == "missing_dags_folder":
# Create DAGs folder
os.makedirs("dags", exist_ok=True)
remediation_results["successful"].append({
**issue,
"action": "Created dags/ directory"
})
elif issue["type"] == "missing_db_config":
# Generate default .env
generate_default_env()
remediation_results["successful"].append({
**issue,
"action": "Generated default .env file"
})
elif issue["type"] == "missing_directory":
# Create missing directory
os.makedirs(issue["path"], exist_ok=True)
remediation_results["successful"].append({
**issue,
"action": f"Created directory: {issue['path']}"
})
except Exception as e:
remediation_results["failed"].append({
**issue,
"error": str(e)
})
return remediation_results
Continuous Monitoring
Scheduled Audit Checks
Run audits on a schedule:
def schedule_periodic_audits():
"""Set up periodic system audits."""
import schedule
def run_and_log_audit():
report = run_full_system_audit()
log_audit_report(report)
if report["overall_status"] == "unhealthy":
send_alert(report)
# Run every hour
schedule.every(1).hours.do(run_and_log_audit)
# Run on startup
run_and_log_audit()
return schedule
Best Practices
- •Audit regularly - Run full system audit daily or before deployments
- •Log all audit results - Keep audit history for trend analysis
- •Alert on critical issues - Notify team when critical problems detected
- •Auto-remediate safe issues - Fix common problems automatically
- •Version configuration - Track config changes over time
- •Test in isolation - Test each component independently
- •Document expected state - Clear documentation of proper configuration
- •Fail fast - Stop deployments if critical issues found
Integration with Garden CLI
Use Garden commands for auditing:
# Run full system audit garden system health # Audit specific component garden db audit # Auto-remediate issues garden system repair
Audit Reports
Export Audit Reports
Save audit results for historical analysis:
def export_audit_report(report, format="json"):
"""Export audit report to file."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir = Path(".claude/logs/audits")
output_dir.mkdir(parents=True, exist_ok=True)
if format == "json":
output_file = output_dir / f"audit_{timestamp}.json"
with open(output_file, 'w') as f:
json.dump(report, f, indent=2)
elif format == "markdown":
output_file = output_dir / f"audit_{timestamp}.md"
with open(output_file, 'w') as f:
f.write(format_audit_report_markdown(report))
return output_file