Workflow Orchestration Skill
Description
Manages Apache Airflow DAGs for orchestrating OpenGov Harvester extraction workflows.
Triggers
- •"start airflow"
- •"trigger dag"
- •"check task status"
- •"airflow orchestration"
- •"schedule extraction"
Airflow Architecture
Deployment
Method: Docker Compose Services: Webserver, Scheduler, Postgres, Redis Web UI: http://localhost:8080 Credentials: airflow / airflow (default)
DAG Structure
Core DAGs:
- •
opengov_master_pipeline: E2E orchestration (Inventory → Extraction → Report) - •
opengov_inventory_queue_manager: Daily project list refresh - •
opengov_queue_worker_pool: Parallel extraction with dynamic task mapping
Common Operations
Start/Stop Airflow
Start Services:
bash
./scripts/airflow_start.sh # Or manually docker-compose -f airflow/docker-compose.yml up -d
Stop Services:
bash
./scripts/airflow_stop.sh # Or manually docker-compose -f airflow/docker-compose.yml down
Check Status:
bash
docker ps | grep airflow # View logs ./scripts/airflow_logs.sh
DAG Management
List DAGs:
bash
docker exec -it airflow-scheduler airflow dags list
Trigger DAG:
bash
# Trigger with default config
docker exec -it airflow-scheduler airflow dags trigger opengov_master_pipeline
# Trigger with parameters
docker exec -it airflow-scheduler airflow dags trigger opengov_master_pipeline \
--conf '{"backend": "supabase", "batch_size": 10}'
Pause/Unpause DAG:
bash
docker exec -it airflow-scheduler airflow dags pause opengov_master_pipeline docker exec -it airflow-scheduler airflow dags unpause opengov_master_pipeline
Task Management
List Tasks:
bash
docker exec -it airflow-scheduler airflow tasks list opengov_master_pipeline
Test Task:
bash
docker exec -it airflow-scheduler airflow tasks test \ opengov_master_pipeline \ extract_opportunities \ 2026-01-25
View Task Logs:
bash
docker exec -it airflow-scheduler airflow tasks logs \ opengov_master_pipeline \ extract_opportunities \ 2026-01-25 \ 1
Monitoring
View DAG Runs:
bash
docker exec -it airflow-scheduler airflow dags list-runs \ -d opengov_master_pipeline \ --state running
View Task Instances:
bash
docker exec -it airflow-scheduler airflow tasks states-for-dag-run \ opengov_master_pipeline \ manual__2026-01-25T12:00:00+00:00
DAG Configuration
Master Pipeline DAG
python
# airflow/dags/opengov_master_pipeline.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'opengov-harvester',
'depends_on_past': False,
'start_date': datetime(2026, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'opengov_master_pipeline',
default_args=default_args,
description='Complete OpenGov extraction pipeline',
schedule_interval='@daily',
catchup=False,
tags=['opengov', 'extraction', 'master'],
) as dag:
# Task 1: Inventory collection
collect_inventory = BashOperator(
task_id='collect_inventory',
bash_command='python /app/scripts/etl/fetch_project_data.py',
)
# Task 2: Sequential extraction
extract_opportunities = BashOperator(
task_id='extract_opportunities',
bash_command='timeout 600 python /app/scripts/etl/step_3_sequential_extraction.py',
)
# Task 3: Generate report
generate_report = BashOperator(
task_id='generate_report',
bash_command='python /app/scripts/generate_report.py',
)
# Define task dependencies
collect_inventory >> extract_opportunities >> generate_report
Dynamic Task Mapping
python
# Dynamic fan-out for parallel processing
from airflow.decorators import task
@task
def get_project_ids():
"""Fetch list of project IDs to process"""
import sqlite3
conn = sqlite3.connect('/app/data/db/opengov_state.db')
cursor = conn.execute("SELECT project_id FROM opengov_projects WHERE extracted = 0 LIMIT 10")
return [row[0] for row in cursor.fetchall()]
@task
def process_project(project_id: str):
"""Process single project"""
import subprocess
subprocess.run([
'python', '/app/scripts/etl/extract_single_project.py',
'--project-id', project_id
])
with DAG(...) as dag:
project_ids = get_project_ids()
process_project.expand(project_id=project_ids)
Related Rules
- •
airflow-dynamic-mapping.md: Dynamic task mapping patterns - •
airflow-orchestrate-only.md: Keep DAGs lightweight, delegate work - •
airflow-concurrency-control.md: Manage parallelism - •
airflow-error-handling.md: Partial failure recovery - •
airflow-idempotent-backfills.md: Safe re-runs
Configuration
Environment Variables
bash
# In docker-compose.yml or .env AIRFLOW__CORE__EXECUTOR=LocalExecutor AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags AIRFLOW__CORE__LOAD_EXAMPLES=False AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True
DAG Parameters
json
{
"backend": "supabase",
"batch_size": 10,
"max_workers": 5,
"timeout_seconds": 600,
"enable_anti_detection": true
}
Monitoring & Alerting
Web UI Monitoring
- •Open http://localhost:8080
- •Navigate to "DAGs" view
- •Click on DAG name to see runs
- •Click on run to see task details
- •View logs for debugging
CLI Monitoring
bash
# Watch DAG runs in real-time watch -n 5 'docker exec -it airflow-scheduler airflow dags list-runs -d opengov_master_pipeline --state running' # View failed tasks docker exec -it airflow-scheduler airflow tasks failed-deps opengov_master_pipeline
Logging
Location: airflow/logs/
Format: <dag_id>/<task_id>/<execution_date>/<try_number>.log
bash
# Tail logs tail -f airflow/logs/opengov_master_pipeline/extract_opportunities/2026-01-25T12:00:00+00:00/1.log
Troubleshooting
DAG Not Appearing
Symptoms: DAG not visible in UI Solutions:
- •Check DAG file syntax:
python -m py_compile airflow/dags/my_dag.py - •Check scheduler logs:
docker logs airflow-scheduler - •Verify DAG folder:
docker exec airflow-scheduler ls /opt/airflow/dags - •Refresh DAGs in UI: Click refresh button
Task Stuck in Running
Symptoms: Task shows "running" but not progressing Solutions:
- •Check task logs for errors
- •Verify task process is running:
docker exec airflow-scheduler ps aux - •Kill zombie tasks:
docker exec airflow-scheduler airflow tasks clear <dag> <task> - •Restart scheduler:
docker restart airflow-scheduler
Connection to Worker Lost
Symptoms: Task fails with "Worker lost" error Solutions:
- •Check worker logs:
docker logs airflow-worker - •Increase worker timeout in config
- •Check resource limits:
docker stats - •Restart worker:
docker restart airflow-worker
Best Practices
DAG Design
- •✅ Keep DAGs idempotent (safe to re-run)
- •✅ Use task groups for logical grouping
- •✅ Set appropriate retries and timeouts
- •✅ Use XCom for small data passing only
- •✅ Delegate heavy work to external scripts
Error Handling
python
from airflow.exceptions import AirflowSkipException
@task
def process_with_validation(data):
if not validate(data):
raise AirflowSkipException("Invalid data, skipping")
return process(data)
Resource Management
python
# Limit concurrent tasks
default_args = {
'max_active_runs': 1,
'max_active_tasks_per_dag': 5,
}
# Set task-level concurrency
extract_task = BashOperator(
task_id='extract',
bash_command='...',
pool='extraction_pool', # Custom pool with limited slots
)
Advanced Features
Branching
python
from airflow.operators.python import BranchPythonOperator
def choose_backend(**context):
backend = context['dag_run'].conf.get('backend', 'sqlite')
return f'extract_to_{backend}'
branch = BranchPythonOperator(
task_id='choose_backend',
python_callable=choose_backend,
)
extract_sqlite = BashOperator(task_id='extract_to_sqlite', ...)
extract_supabase = BashOperator(task_id='extract_to_supabase', ...)
branch >> [extract_sqlite, extract_supabase]
Dynamic DAG Generation
python
# Generate multiple similar DAGs
for project_type in ['federal', 'state', 'local']:
dag_id = f'opengov_extract_{project_type}'
with DAG(dag_id, ...) as dag:
extract = BashOperator(
task_id='extract',
bash_command=f'python extract.py --type {project_type}',
)
globals()[dag_id] = dag
Performance Optimization
Parallelism Settings
python
# In airflow.cfg or environment AIRFLOW__CORE__PARALLELISM=32 # Global parallelism AIRFLOW__CORE__DAG_CONCURRENCY=16 # Per-DAG concurrency AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=3 # Active DAG runs
Task Pools
bash
# Create pool with 5 slots
docker exec airflow-scheduler airflow pools set extraction_pool 5 "Extraction task pool"
# Assign tasks to pool
extract_task = BashOperator(
task_id='extract',
bash_command='...',
pool='extraction_pool',
)