AgentSkillsCN

Airflow Management

Airflow 管理

SKILL.md

This skill provides expertise in managing Apache Airflow DAGs for ETL orchestration. It should be used when the user asks about "airflow dag", "create workflow", "dag management", "airflow orchestration", "schedule dags", or similar Airflow-related topics.

Airflow DAG Management and Best Practices

Master the creation, management, and orchestration of Apache Airflow DAGs for ETL pipelines with proper error handling, wiring, and monitoring.

Standard DAG Template

Base Template with Best Practices

Every DAG should follow this standard structure:

python
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

# Import project-specific utilities
from src.database.db_adapter import DBAdapter
from src.utils.logger import setup_logger

logger = setup_logger(__name__)

# Default arguments applied to all tasks
default_args = {
    'owner': 'opengov-harvester',
    'depends_on_past': False,
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'max_retry_delay': timedelta(minutes=30),
}

# DAG definition
dag = DAG(
    'opengov_example_workflow',
    default_args=default_args,
    description='Example workflow for OpenGov data extraction',
    schedule_interval='0 2 * * *',  # Daily at 2 AM
    start_date=days_ago(1),
    catchup=False,
    tags=['opengov', 'etl', 'extraction'],
    max_active_runs=1,  # Prevent concurrent runs
)

def task_with_error_handling(**context):
    """Example task with proper error handling."""
    db = DBAdapter()

    try:
        # Task logic here
        logger.info("Starting task execution")

        # Do work
        result = perform_extraction()

        # Log success
        db.log_success(
            dag_id=context['dag'].dag_id,
            task_id=context['task'].task_id,
            run_id=context['run_id'],
            result=result
        )

        return result

    except Exception as e:
        # Log error to database
        db.log_error(
            dag_id=context['dag'].dag_id,
            task_id=context['task'].task_id,
            run_id=context['run_id'],
            error=str(e)
        )

        logger.error(f"Task failed: {e}", exc_info=True)
        raise  # Re-raise to mark task as failed

    finally:
        db.close()

# Define tasks
extract_task = PythonOperator(
    task_id='extract_data',
    python_callable=task_with_error_handling,
    dag=dag,
)

# Set task dependencies
extract_task

DAG Creation Patterns

Pattern 1: Sequential ETL Pipeline

Extract → Transform → Load pattern:

python
from airflow.operators.python import PythonOperator

def extract(**context):
    """Extract data from source."""
    logger.info("Extracting data...")
    # Extraction logic
    return extracted_data

def transform(**context):
    """Transform extracted data."""
    ti = context['ti']
    data = ti.xcom_pull(task_ids='extract')
    logger.info(f"Transforming {len(data)} records...")
    # Transformation logic
    return transformed_data

def load(**context):
    """Load data to destination."""
    ti = context['ti']
    data = ti.xcom_pull(task_ids='transform')
    logger.info(f"Loading {len(data)} records...")
    # Load logic

extract_task = PythonOperator(task_id='extract', python_callable=extract, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load, dag=dag)

extract_task >> transform_task >> load_task

Pattern 2: Dynamic Task Mapping (Airflow 2.3+)

Process multiple items in parallel:

python
from airflow.decorators import task

@task
def get_project_list():
    """Get list of projects to process."""
    db = DBAdapter()
    projects = db.get_pending_projects()
    return [p['project_id'] for p in projects]

@task
def process_project(project_id):
    """Process a single project."""
    logger.info(f"Processing project: {project_id}")
    # Processing logic
    return {"project_id": project_id, "status": "completed"}

@task
def aggregate_results(results):
    """Aggregate all project results."""
    logger.info(f"Processed {len(results)} projects")
    # Store aggregated results

with DAG('opengov_parallel_processing', ...) as dag:
    projects = get_project_list()
    results = process_project.expand(project_id=projects)
    aggregate_results(results)

Pattern 3: Conditional Branching

Execute different paths based on conditions:

python
from airflow.operators.python import BranchPythonOperator

def check_data_freshness(**context):
    """Decide whether to skip or proceed."""
    db = DBAdapter()
    last_update = db.get_last_update_time()

    if (datetime.now() - last_update).hours < 6:
        return 'skip_extraction'  # Task ID to execute
    else:
        return 'run_extraction'

branch_task = BranchPythonOperator(
    task_id='check_freshness',
    python_callable=check_data_freshness,
    dag=dag,
)

skip_task = PythonOperator(
    task_id='skip_extraction',
    python_callable=lambda: logger.info("Skipping - data is fresh"),
    dag=dag,
)

run_task = PythonOperator(
    task_id='run_extraction',
    python_callable=extract_data,
    dag=dag,
)

branch_task >> [skip_task, run_task]

Database Wiring

Proper Database Connection Configuration

Configure database connections correctly in DAGs:

python
import os
from src.database.db_adapter import DBAdapter

def get_database_config():
    """Get database configuration based on environment."""
    storage_target = os.getenv('DB_STORAGE_TARGET', 'sqlite')

    if storage_target == 'supabase':
        return {
            'type': 'postgres',
            'host': os.getenv('SUPABASE_DB_HOST', 'localhost'),
            'port': int(os.getenv('SUPABASE_DB_PORT', 54322)),
            'database': os.getenv('SUPABASE_DB_NAME', 'postgres'),
            'user': os.getenv('SUPABASE_DB_USER', 'postgres'),
            'password': os.getenv('SUPABASE_DB_PASSWORD'),
        }
    else:  # sqlite
        return {
            'type': 'sqlite',
            'database': os.getenv('SQLITE_DB_PATH', 'data/db/opengov_state.db'),
        }

def task_with_db(**context):
    """Task that uses database adapter."""
    config = get_database_config()
    db = DBAdapter(**config)

    try:
        # Use database
        db.execute("SELECT * FROM projects")
    finally:
        db.close()

Environment Variable Validation

Validate required environment variables at DAG load time:

python
import os

REQUIRED_ENV_VARS = [
    'DB_STORAGE_TARGET',
    'SUPABASE_URL',
    'SQLITE_DB_PATH',
]

def validate_environment():
    """Validate required environment variables are set."""
    missing = []
    for var in REQUIRED_ENV_VARS:
        if not os.getenv(var):
            missing.append(var)

    if missing:
        raise EnvironmentError(
            f"Missing required environment variables: {', '.join(missing)}"
        )

# Validate on DAG load
validate_environment()

Error Handling and Logging

Centralized Error Reporting

Report all errors to database table:

python
def log_dag_error(context):
    """Callback function for task failure."""
    db = DBAdapter()

    error_record = {
        'dag_id': context['dag'].dag_id,
        'task_id': context['task_instance'].task_id,
        'execution_date': context['execution_date'],
        'error_message': str(context['exception']),
        'traceback': context['task_instance'].log_url,
        'run_id': context['run_id'],
    }

    db.insert('run_errors', error_record)
    db.close()

# Add to default_args
default_args = {
    ...
    'on_failure_callback': log_dag_error,
}

Comprehensive Logging

Log all significant events:

python
def logged_task(**context):
    """Task with comprehensive logging."""
    logger.info(f"Task started: {context['task_instance'].task_id}")
    logger.info(f"Run ID: {context['run_id']}")
    logger.info(f"Execution date: {context['execution_date']}")

    start_time = datetime.now()

    try:
        # Task work
        result = do_work()

        duration = (datetime.now() - start_time).total_seconds()
        logger.info(f"Task completed in {duration}s")

        return result

    except Exception as e:
        duration = (datetime.now() - start_time).total_seconds()
        logger.error(f"Task failed after {duration}s: {e}")
        raise

Monitoring and Observability

Custom Metrics

Track custom metrics for monitoring:

python
from airflow.metrics import Stats

def task_with_metrics(**context):
    """Task that emits custom metrics."""
    start_time = time.time()

    try:
        # Do work
        records_processed = process_records()

        # Emit metrics
        Stats.gauge('opengov.records.processed', records_processed)
        Stats.timing('opengov.task.duration', time.time() - start_time)
        Stats.incr('opengov.task.success')

    except Exception as e:
        Stats.incr('opengov.task.failure')
        raise

Health Check DAG

Create a DAG to monitor system health:

python
from airflow import DAG
from airflow.operators.python import PythonOperator

def check_database_health():
    """Check database connectivity and health."""
    db = DBAdapter()
    try:
        db.execute("SELECT 1")
        return "healthy"
    except Exception as e:
        raise Exception(f"Database unhealthy: {e}")
    finally:
        db.close()

def check_supabase_health():
    """Check Supabase API health."""
    import requests
    response = requests.get(f"{os.getenv('SUPABASE_URL')}/rest/v1/")
    if response.status_code != 200:
        raise Exception(f"Supabase unhealthy: {response.status_code}")

with DAG('system_health_check', schedule_interval='*/15 * * * *', ...) as dag:
    db_check = PythonOperator(task_id='check_database', python_callable=check_database_health)
    supabase_check = PythonOperator(task_id='check_supabase', python_callable=check_supabase_health)

    [db_check, supabase_check]

DAG Testing

Local Testing Without Airflow

Test DAG tasks independently:

python
# tests/test_dags.py
import pytest
from dags.opengov_extraction import extract_task, transform_task

def test_extract_task():
    """Test extract task logic."""
    # Mock context
    context = {
        'dag': Mock(dag_id='test_dag'),
        'task': Mock(task_id='test_task'),
        'run_id': 'test_run',
    }

    # Execute task
    result = extract_task.python_callable(**context)

    # Assert results
    assert result is not None
    assert len(result) > 0

def test_dag_structure():
    """Test DAG structure and dependencies."""
    from dags.opengov_extraction import dag

    # Check tasks exist
    assert 'extract' in dag.task_ids
    assert 'transform' in dag.task_ids
    assert 'load' in dag.task_ids

    # Check dependencies
    extract = dag.get_task('extract')
    assert 'transform' in [t.task_id for t in extract.downstream_list]

Best Practices

  1. Use default_args - Define common settings once
  2. Implement error callbacks - Log all failures to database
  3. Validate environment - Check required vars at DAG load time
  4. Use task groups - Organize complex DAGs into logical groups
  5. Limit concurrency - Set max_active_runs to prevent resource exhaustion
  6. Enable catchup=False - Avoid backfilling unless needed
  7. Tag your DAGs - Use tags for filtering and organization
  8. Document everything - Clear docstrings for DAG and tasks
  9. Use XCom sparingly - Don't pass large data between tasks
  10. Test locally first - Always test before deploying

Common Issues

Import Errors in DAG Files

DAGs must be importable without executing tasks:

python
# ❌ Bad - executes on import
db = DBAdapter()  # Opens connection at import time

# ✅ Good - executes at runtime
def get_db():
    return DBAdapter()

Circular Dependencies

Detect and prevent circular dependencies:

bash
# Test DAG file for errors
python dags/my_dag.py

# Check for circular dependencies
airflow dags test opengov_workflow 2024-01-01

Memory Issues with XCom

Don't pass large datasets via XCom:

python
# ❌ Bad - passes large data via XCom
@task
def extract():
    return large_dataset  # Don't do this!

# ✅ Good - store in database/file
@task
def extract():
    save_to_database(large_dataset)
    return {"records": len(large_dataset), "path": "db://opengov.projects"}

Garden CLI Integration

Use Garden commands to manage DAGs:

bash
# Create new DAG from template
garden dag new my_workflow --template=etl

# List all DAGs
garden dag list

# Monitor specific DAG
garden dag monitor opengov_extraction