AgentSkillsCN

System Auditing

系统审计

SKILL.md

This skill provides expertise in auditing system wiring, validating component connections, and ensuring proper configuration across Supabase, Airflow, and SQLite. It should be used when the user asks about "audit system", "validate wiring", "check connections", "verify configuration", or similar validation topics.

System Auditing and Wiring Validation

Master the techniques for validating that all system components are properly connected, configured, and functioning together correctly.

Core Audit Areas

1. Database Connectivity

Verify all database connections work:

python
def audit_database_connectivity():
    """Audit database connectivity for all configured databases."""
    results = {
        "sqlite": audit_sqlite_connection(),
        "supabase": audit_supabase_connection(),
    }

    return results

def audit_sqlite_connection():
    """Test SQLite database connectivity."""
    try:
        db_path = os.getenv('SQLITE_DB_PATH', 'data/db/opengov_state.db')

        if not os.path.exists(db_path):
            return {
                "status": "error",
                "message": f"Database file not found: {db_path}"
            }

        conn = sqlite3.connect(db_path)
        conn.execute('PRAGMA journal_mode')  # Test query
        conn.close()

        return {
            "status": "healthy",
            "path": db_path,
            "message": "SQLite connection successful"
        }

    except Exception as e:
        return {
            "status": "error",
            "message": f"SQLite connection failed: {e}"
        }

def audit_supabase_connection():
    """Test Supabase connectivity."""
    try:
        supabase_url = os.getenv('SUPABASE_URL')
        supabase_key = os.getenv('SUPABASE_ANON_KEY')

        if not supabase_url or not supabase_key:
            return {
                "status": "error",
                "message": "SUPABASE_URL or SUPABASE_ANON_KEY not configured"
            }

        # Test connection
        import requests
        response = requests.get(
            f"{supabase_url}/rest/v1/",
            headers={"apikey": supabase_key},
            timeout=10
        )

        if response.status_code == 200:
            return {
                "status": "healthy",
                "url": supabase_url,
                "message": "Supabase connection successful"
            }
        else:
            return {
                "status": "error",
                "message": f"Supabase returned status {response.status_code}"
            }

    except Exception as e:
        return {
            "status": "error",
            "message": f"Supabase connection failed: {e}"
        }

2. Environment Variable Validation

Validate all required environment variables are set:

python
REQUIRED_ENV_VARS = {
    "core": [
        "DB_STORAGE_TARGET",
    ],
    "supabase": [
        "SUPABASE_URL",
        "SUPABASE_ANON_KEY",
        "SUPABASE_SERVICE_ROLE_KEY",
    ],
    "airflow": [
        "AIRFLOW_HOME",
        "AIRFLOW__CORE__DAGS_FOLDER",
    ],
    "sqlite": [
        "SQLITE_DB_PATH",
    ],
}

def audit_environment_variables():
    """Audit environment variable configuration."""
    results = {
        "missing": [],
        "present": [],
        "warnings": [],
    }

    # Check core variables
    for var in REQUIRED_ENV_VARS["core"]:
        if not os.getenv(var):
            results["missing"].append({
                "variable": var,
                "category": "core",
                "severity": "critical"
            })
        else:
            results["present"].append(var)

    # Check Supabase variables if Supabase is enabled
    if os.getenv("DB_STORAGE_TARGET") == "supabase":
        for var in REQUIRED_ENV_VARS["supabase"]:
            if not os.getenv(var):
                results["missing"].append({
                    "variable": var,
                    "category": "supabase",
                    "severity": "critical"
                })
            else:
                results["present"].append(var)

    # Check Airflow variables if Airflow detected
    airflow_home = os.getenv("AIRFLOW_HOME")
    if airflow_home and os.path.exists(airflow_home):
        for var in REQUIRED_ENV_VARS["airflow"]:
            if not os.getenv(var):
                results["warnings"].append({
                    "variable": var,
                    "category": "airflow",
                    "severity": "warning",
                    "message": "Airflow detected but variable not set"
                })

    return results

3. Airflow DAG Wiring

Validate that Airflow DAGs are properly wired to databases:

python
def audit_airflow_dag_wiring():
    """Audit Airflow DAG database connections."""
    results = {
        "dags_checked": 0,
        "issues": [],
        "healthy": [],
    }

    # Find DAG files
    dags_folder = os.getenv("AIRFLOW__CORE__DAGS_FOLDER", "dags")
    if not os.path.exists(dags_folder):
        results["issues"].append({
            "type": "missing_dags_folder",
            "message": f"DAGs folder not found: {dags_folder}"
        })
        return results

    # Check each DAG file
    for dag_file in Path(dags_folder).glob("*.py"):
        results["dags_checked"] += 1
        issues = check_dag_file_wiring(dag_file)

        if issues:
            results["issues"].extend(issues)
        else:
            results["healthy"].append(str(dag_file))

    return results

def check_dag_file_wiring(dag_file):
    """Check a single DAG file for proper wiring."""
    issues = []

    try:
        content = dag_file.read_text()

        # Check for DBAdapter import
        if "DBAdapter" in content or "db_adapter" in content:
            # Check if DB_STORAGE_TARGET is referenced
            if "DB_STORAGE_TARGET" not in content:
                issues.append({
                    "file": str(dag_file),
                    "type": "missing_db_config_check",
                    "message": "DAG uses DBAdapter but doesn't check DB_STORAGE_TARGET"
                })

            # Check for proper error handling
            if "try:" not in content or "except" not in content:
                issues.append({
                    "file": str(dag_file),
                    "type": "missing_error_handling",
                    "message": "DAG lacks try/except error handling"
                })

        # Check for hardcoded database paths
        if "data/db/" in content and "os.getenv" not in content:
            issues.append({
                "file": str(dag_file),
                "type": "hardcoded_db_path",
                "message": "DAG contains hardcoded database path"
            })

    except Exception as e:
        issues.append({
            "file": str(dag_file),
            "type": "parse_error",
            "message": f"Failed to parse DAG file: {e}"
        })

    return issues

4. Schema Consistency

Verify schemas are synchronized:

python
def audit_schema_consistency():
    """Audit schema consistency between SQLite and Postgres."""
    results = {
        "status": "unknown",
        "differences": [],
        "recommendations": [],
    }

    try:
        # Connect to both databases
        sqlite_conn = get_sqlite_connection()
        postgres_conn = get_postgres_connection()

        # Compare schemas
        differences = compare_schemas(sqlite_conn, postgres_conn)

        if not differences["missing_in_postgres"] and \
           not differences["missing_in_sqlite"] and \
           not differences["column_mismatches"]:
            results["status"] = "synced"
            results["message"] = "Schemas are synchronized"
        else:
            results["status"] = "drift_detected"
            results["differences"] = differences

            # Generate recommendations
            if differences["missing_in_postgres"]:
                results["recommendations"].append(
                    "Run 'garden db sync' to sync missing tables to Postgres"
                )

            if differences["column_mismatches"]:
                results["recommendations"].append(
                    "Review column mismatches and create migration"
                )

    except Exception as e:
        results["status"] = "error"
        results["message"] = f"Schema audit failed: {e}"

    return results

5. File Structure Validation

Verify expected project structure exists:

python
EXPECTED_STRUCTURE = {
    "directories": [
        "data/db",
        "data/exports",
        "supabase/migrations",
        "dags",
        "src/database",
        "logs",
    ],
    "files": [
        ".env",
        "config/garden.yml",
        "src/database/schema.py",
    ],
}

def audit_file_structure():
    """Audit project file structure."""
    results = {
        "missing_directories": [],
        "missing_files": [],
        "present": [],
    }

    # Check directories
    for directory in EXPECTED_STRUCTURE["directories"]:
        if os.path.exists(directory):
            results["present"].append(directory)
        else:
            results["missing_directories"].append({
                "path": directory,
                "severity": "warning",
                "message": f"Expected directory not found: {directory}"
            })

    # Check files
    for file_path in EXPECTED_STRUCTURE["files"]:
        if os.path.exists(file_path):
            results["present"].append(file_path)
        else:
            results["missing_files"].append({
                "path": file_path,
                "severity": "warning" if file_path.startswith("config/") else "error",
                "message": f"Expected file not found: {file_path}"
            })

    return results

Comprehensive System Audit

Full System Audit Report

Run all audits and generate comprehensive report:

python
def run_full_system_audit():
    """Run comprehensive system audit."""
    report = {
        "timestamp": datetime.now().isoformat(),
        "overall_status": "unknown",
        "audits": {
            "database_connectivity": audit_database_connectivity(),
            "environment_variables": audit_environment_variables(),
            "airflow_dag_wiring": audit_airflow_dag_wiring(),
            "schema_consistency": audit_schema_consistency(),
            "file_structure": audit_file_structure(),
        },
        "critical_issues": [],
        "warnings": [],
        "recommendations": [],
    }

    # Collect issues across all audits
    for audit_name, audit_result in report["audits"].items():
        if isinstance(audit_result, dict):
            # Extract issues
            if "issues" in audit_result:
                for issue in audit_result["issues"]:
                    if issue.get("severity") == "critical":
                        report["critical_issues"].append({
                            "audit": audit_name,
                            **issue
                        })
                    else:
                        report["warnings"].append({
                            "audit": audit_name,
                            **issue
                        })

            # Extract recommendations
            if "recommendations" in audit_result:
                report["recommendations"].extend(audit_result["recommendations"])

    # Determine overall status
    if report["critical_issues"]:
        report["overall_status"] = "unhealthy"
    elif report["warnings"]:
        report["overall_status"] = "degraded"
    else:
        report["overall_status"] = "healthy"

    return report

def format_audit_report(report):
    """Format audit report for display."""
    lines = []

    lines.append("=" * 60)
    lines.append("SYSTEM AUDIT REPORT")
    lines.append("=" * 60)
    lines.append(f"Timestamp: {report['timestamp']}")
    lines.append(f"Overall Status: {report['overall_status'].upper()}")
    lines.append("")

    if report["critical_issues"]:
        lines.append("🔴 CRITICAL ISSUES:")
        for issue in report["critical_issues"]:
            lines.append(f"  - [{issue['audit']}] {issue.get('message', issue.get('type'))}")
        lines.append("")

    if report["warnings"]:
        lines.append("⚠️  WARNINGS:")
        for warning in report["warnings"]:
            lines.append(f"  - [{warning['audit']}] {warning.get('message', warning.get('type'))}")
        lines.append("")

    if report["recommendations"]:
        lines.append("💡 RECOMMENDATIONS:")
        for rec in report["recommendations"]:
            lines.append(f"  - {rec}")
        lines.append("")

    lines.append("=" * 60)

    return "\n".join(lines)

Automated Remediation

Auto-Fix Common Issues

Attempt to automatically fix common configuration issues:

python
def auto_remediate_issues(audit_report):
    """Attempt automatic remediation of audit issues."""
    remediation_results = {
        "attempted": [],
        "successful": [],
        "failed": [],
    }

    for issue in audit_report["critical_issues"]:
        remediation_results["attempted"].append(issue)

        try:
            if issue["type"] == "missing_dags_folder":
                # Create DAGs folder
                os.makedirs("dags", exist_ok=True)
                remediation_results["successful"].append({
                    **issue,
                    "action": "Created dags/ directory"
                })

            elif issue["type"] == "missing_db_config":
                # Generate default .env
                generate_default_env()
                remediation_results["successful"].append({
                    **issue,
                    "action": "Generated default .env file"
                })

            elif issue["type"] == "missing_directory":
                # Create missing directory
                os.makedirs(issue["path"], exist_ok=True)
                remediation_results["successful"].append({
                    **issue,
                    "action": f"Created directory: {issue['path']}"
                })

        except Exception as e:
            remediation_results["failed"].append({
                **issue,
                "error": str(e)
            })

    return remediation_results

Continuous Monitoring

Scheduled Audit Checks

Run audits on a schedule:

python
def schedule_periodic_audits():
    """Set up periodic system audits."""
    import schedule

    def run_and_log_audit():
        report = run_full_system_audit()
        log_audit_report(report)

        if report["overall_status"] == "unhealthy":
            send_alert(report)

    # Run every hour
    schedule.every(1).hours.do(run_and_log_audit)

    # Run on startup
    run_and_log_audit()

    return schedule

Best Practices

  1. Audit regularly - Run full system audit daily or before deployments
  2. Log all audit results - Keep audit history for trend analysis
  3. Alert on critical issues - Notify team when critical problems detected
  4. Auto-remediate safe issues - Fix common problems automatically
  5. Version configuration - Track config changes over time
  6. Test in isolation - Test each component independently
  7. Document expected state - Clear documentation of proper configuration
  8. Fail fast - Stop deployments if critical issues found

Integration with Garden CLI

Use Garden commands for auditing:

bash
# Run full system audit
garden system health

# Audit specific component
garden db audit

# Auto-remediate issues
garden system repair

Audit Reports

Export Audit Reports

Save audit results for historical analysis:

python
def export_audit_report(report, format="json"):
    """Export audit report to file."""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    output_dir = Path(".claude/logs/audits")
    output_dir.mkdir(parents=True, exist_ok=True)

    if format == "json":
        output_file = output_dir / f"audit_{timestamp}.json"
        with open(output_file, 'w') as f:
            json.dump(report, f, indent=2)

    elif format == "markdown":
        output_file = output_dir / f"audit_{timestamp}.md"
        with open(output_file, 'w') as f:
            f.write(format_audit_report_markdown(report))

    return output_file