AgentSkillsCN

Orchestration

编排

SKILL.md

Workflow Orchestration Skill

Description

Manages Apache Airflow DAGs for orchestrating OpenGov Harvester extraction workflows.

Triggers

  • "start airflow"
  • "trigger dag"
  • "check task status"
  • "airflow orchestration"
  • "schedule extraction"

Airflow Architecture

Deployment

Method: Docker Compose Services: Webserver, Scheduler, Postgres, Redis Web UI: http://localhost:8080 Credentials: airflow / airflow (default)

DAG Structure

Core DAGs:

  1. opengov_master_pipeline: E2E orchestration (Inventory → Extraction → Report)
  2. opengov_inventory_queue_manager: Daily project list refresh
  3. opengov_queue_worker_pool: Parallel extraction with dynamic task mapping

Common Operations

Start/Stop Airflow

Start Services:

bash
./scripts/airflow_start.sh

# Or manually
docker-compose -f airflow/docker-compose.yml up -d

Stop Services:

bash
./scripts/airflow_stop.sh

# Or manually
docker-compose -f airflow/docker-compose.yml down

Check Status:

bash
docker ps | grep airflow

# View logs
./scripts/airflow_logs.sh

DAG Management

List DAGs:

bash
docker exec -it airflow-scheduler airflow dags list

Trigger DAG:

bash
# Trigger with default config
docker exec -it airflow-scheduler airflow dags trigger opengov_master_pipeline

# Trigger with parameters
docker exec -it airflow-scheduler airflow dags trigger opengov_master_pipeline \
  --conf '{"backend": "supabase", "batch_size": 10}'

Pause/Unpause DAG:

bash
docker exec -it airflow-scheduler airflow dags pause opengov_master_pipeline
docker exec -it airflow-scheduler airflow dags unpause opengov_master_pipeline

Task Management

List Tasks:

bash
docker exec -it airflow-scheduler airflow tasks list opengov_master_pipeline

Test Task:

bash
docker exec -it airflow-scheduler airflow tasks test \
  opengov_master_pipeline \
  extract_opportunities \
  2026-01-25

View Task Logs:

bash
docker exec -it airflow-scheduler airflow tasks logs \
  opengov_master_pipeline \
  extract_opportunities \
  2026-01-25 \
  1

Monitoring

View DAG Runs:

bash
docker exec -it airflow-scheduler airflow dags list-runs \
  -d opengov_master_pipeline \
  --state running

View Task Instances:

bash
docker exec -it airflow-scheduler airflow tasks states-for-dag-run \
  opengov_master_pipeline \
  manual__2026-01-25T12:00:00+00:00

DAG Configuration

Master Pipeline DAG

python
# airflow/dags/opengov_master_pipeline.py

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'opengov-harvester',
    'depends_on_past': False,
    'start_date': datetime(2026, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'opengov_master_pipeline',
    default_args=default_args,
    description='Complete OpenGov extraction pipeline',
    schedule_interval='@daily',
    catchup=False,
    tags=['opengov', 'extraction', 'master'],
) as dag:

    # Task 1: Inventory collection
    collect_inventory = BashOperator(
        task_id='collect_inventory',
        bash_command='python /app/scripts/etl/fetch_project_data.py',
    )

    # Task 2: Sequential extraction
    extract_opportunities = BashOperator(
        task_id='extract_opportunities',
        bash_command='timeout 600 python /app/scripts/etl/step_3_sequential_extraction.py',
    )

    # Task 3: Generate report
    generate_report = BashOperator(
        task_id='generate_report',
        bash_command='python /app/scripts/generate_report.py',
    )

    # Define task dependencies
    collect_inventory >> extract_opportunities >> generate_report

Dynamic Task Mapping

python
# Dynamic fan-out for parallel processing
from airflow.decorators import task

@task
def get_project_ids():
    """Fetch list of project IDs to process"""
    import sqlite3
    conn = sqlite3.connect('/app/data/db/opengov_state.db')
    cursor = conn.execute("SELECT project_id FROM opengov_projects WHERE extracted = 0 LIMIT 10")
    return [row[0] for row in cursor.fetchall()]

@task
def process_project(project_id: str):
    """Process single project"""
    import subprocess
    subprocess.run([
        'python', '/app/scripts/etl/extract_single_project.py',
        '--project-id', project_id
    ])

with DAG(...) as dag:
    project_ids = get_project_ids()
    process_project.expand(project_id=project_ids)

Related Rules

  • airflow-dynamic-mapping.md: Dynamic task mapping patterns
  • airflow-orchestrate-only.md: Keep DAGs lightweight, delegate work
  • airflow-concurrency-control.md: Manage parallelism
  • airflow-error-handling.md: Partial failure recovery
  • airflow-idempotent-backfills.md: Safe re-runs

Configuration

Environment Variables

bash
# In docker-compose.yml or .env
AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags
AIRFLOW__CORE__LOAD_EXAMPLES=False
AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True

DAG Parameters

json
{
  "backend": "supabase",
  "batch_size": 10,
  "max_workers": 5,
  "timeout_seconds": 600,
  "enable_anti_detection": true
}

Monitoring & Alerting

Web UI Monitoring

  1. Open http://localhost:8080
  2. Navigate to "DAGs" view
  3. Click on DAG name to see runs
  4. Click on run to see task details
  5. View logs for debugging

CLI Monitoring

bash
# Watch DAG runs in real-time
watch -n 5 'docker exec -it airflow-scheduler airflow dags list-runs -d opengov_master_pipeline --state running'

# View failed tasks
docker exec -it airflow-scheduler airflow tasks failed-deps opengov_master_pipeline

Logging

Location: airflow/logs/ Format: <dag_id>/<task_id>/<execution_date>/<try_number>.log

bash
# Tail logs
tail -f airflow/logs/opengov_master_pipeline/extract_opportunities/2026-01-25T12:00:00+00:00/1.log

Troubleshooting

DAG Not Appearing

Symptoms: DAG not visible in UI Solutions:

  1. Check DAG file syntax: python -m py_compile airflow/dags/my_dag.py
  2. Check scheduler logs: docker logs airflow-scheduler
  3. Verify DAG folder: docker exec airflow-scheduler ls /opt/airflow/dags
  4. Refresh DAGs in UI: Click refresh button

Task Stuck in Running

Symptoms: Task shows "running" but not progressing Solutions:

  1. Check task logs for errors
  2. Verify task process is running: docker exec airflow-scheduler ps aux
  3. Kill zombie tasks: docker exec airflow-scheduler airflow tasks clear <dag> <task>
  4. Restart scheduler: docker restart airflow-scheduler

Connection to Worker Lost

Symptoms: Task fails with "Worker lost" error Solutions:

  1. Check worker logs: docker logs airflow-worker
  2. Increase worker timeout in config
  3. Check resource limits: docker stats
  4. Restart worker: docker restart airflow-worker

Best Practices

DAG Design

  • ✅ Keep DAGs idempotent (safe to re-run)
  • ✅ Use task groups for logical grouping
  • ✅ Set appropriate retries and timeouts
  • ✅ Use XCom for small data passing only
  • ✅ Delegate heavy work to external scripts

Error Handling

python
from airflow.exceptions import AirflowSkipException

@task
def process_with_validation(data):
    if not validate(data):
        raise AirflowSkipException("Invalid data, skipping")
    return process(data)

Resource Management

python
# Limit concurrent tasks
default_args = {
    'max_active_runs': 1,
    'max_active_tasks_per_dag': 5,
}

# Set task-level concurrency
extract_task = BashOperator(
    task_id='extract',
    bash_command='...',
    pool='extraction_pool',  # Custom pool with limited slots
)

Advanced Features

Branching

python
from airflow.operators.python import BranchPythonOperator

def choose_backend(**context):
    backend = context['dag_run'].conf.get('backend', 'sqlite')
    return f'extract_to_{backend}'

branch = BranchPythonOperator(
    task_id='choose_backend',
    python_callable=choose_backend,
)

extract_sqlite = BashOperator(task_id='extract_to_sqlite', ...)
extract_supabase = BashOperator(task_id='extract_to_supabase', ...)

branch >> [extract_sqlite, extract_supabase]

Dynamic DAG Generation

python
# Generate multiple similar DAGs
for project_type in ['federal', 'state', 'local']:
    dag_id = f'opengov_extract_{project_type}'

    with DAG(dag_id, ...) as dag:
        extract = BashOperator(
            task_id='extract',
            bash_command=f'python extract.py --type {project_type}',
        )

    globals()[dag_id] = dag

Performance Optimization

Parallelism Settings

python
# In airflow.cfg or environment
AIRFLOW__CORE__PARALLELISM=32  # Global parallelism
AIRFLOW__CORE__DAG_CONCURRENCY=16  # Per-DAG concurrency
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=3  # Active DAG runs

Task Pools

bash
# Create pool with 5 slots
docker exec airflow-scheduler airflow pools set extraction_pool 5 "Extraction task pool"

# Assign tasks to pool
extract_task = BashOperator(
    task_id='extract',
    bash_command='...',
    pool='extraction_pool',
)