AgentSkillsCN

Etl Pipeline

ETL 流程

SKILL.md

ETL Pipeline Skill

Description

Manages the complete Extract-Transform-Load pipeline for OpenGov procurement data extraction.

Triggers

  • "run etl"
  • "extract opportunities"
  • "process project data"
  • "start extraction"
  • "run pipeline"

Capabilities

  1. Authentication & Session Management

    • Establishes authenticated session with OpenGov
    • Manages session persistence across runs
    • Handles session refresh and token management
  2. Project Inventory Collection

    • Discovers all available projects
    • Populates opengov_projects table
    • Tracks extraction status per project
  3. Opportunity Extraction

    • Extracts opportunity details from project pages
    • Downloads associated documents (PDFs, DOCX)
    • Stores structured data in SQLite and Supabase
  4. Incremental Updates

    • Version-based change detection
    • Only re-extracts modified projects
    • Efficient delta processing

Commands

Full Pipeline

bash
# Run complete ETL pipeline
python main.py run

# Or via unified engine
python unified_engine.py --all

Individual Steps

bash
# Step 1: Authentication
python scripts/etl/step_1_auth_session.py

# Step 2: Project inventory
python scripts/etl/fetch_project_data.py

# Step 3: Sequential extraction
python scripts/etl/step_3_sequential_extraction.py

# Step 4: Gateway API extraction (if available)
python scripts/etl/gateway_api_extractor.py

With Timeouts

bash
# Long-running extraction with timeout
timeout 600 python scripts/etl/step_3_sequential_extraction.py

# Quick test run
timeout 30 python scripts/etl/step_1_auth_session.py

Related Rules

  • etl-atomic-writes.md: Ensure atomic database updates
  • etl-retry-backoff.md: Handle transient failures
  • etl-idempotency-patterns.md: Safe re-runs
  • etl-incremental-loads.md: Efficient delta processing
  • data-storage-formats.md: Canonical output paths

Prerequisites

  • Supabase running locally: supabase start
  • Environment variables set in config/env/.env
  • SQLite database initialized
  • Playwright installed: playwright install

Output Locations

  • SQLite: data/db/opengov_state.db
  • Supabase: Remote tables in opengov schema
  • Documents: data/exports/project-artifacts/<project_id>/
  • Logs: logs/opengov_<timestamp>.log

Success Indicators

  • ✅ Session established without Turnstile trigger
  • ✅ All projects inventoried
  • ✅ Opportunities extracted with complete metadata
  • ✅ Documents downloaded successfully
  • ✅ Data synced to both SQLite and Supabase

Troubleshooting

  • Turnstile triggered: See .agent/skills/opengov-harvester/anti-detection/SKILL.md
  • Database locked: Check WAL mode enabled
  • Missing data: Verify extraction status in database
  • Session expired: Re-run authentication step