This skill provides expertise in managing Apache Airflow DAGs for ETL orchestration. It should be used when the user asks about "airflow dag", "create workflow", "dag management", "airflow orchestration", "schedule dags", or similar Airflow-related topics.
Airflow DAG Management and Best Practices
Master the creation, management, and orchestration of Apache Airflow DAGs for ETL pipelines with proper error handling, wiring, and monitoring.
Standard DAG Template
Base Template with Best Practices
Every DAG should follow this standard structure:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
# Import project-specific utilities
from src.database.db_adapter import DBAdapter
from src.utils.logger import setup_logger
logger = setup_logger(__name__)
# Default arguments applied to all tasks
default_args = {
'owner': 'opengov-harvester',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(minutes=30),
}
# DAG definition
dag = DAG(
'opengov_example_workflow',
default_args=default_args,
description='Example workflow for OpenGov data extraction',
schedule_interval='0 2 * * *', # Daily at 2 AM
start_date=days_ago(1),
catchup=False,
tags=['opengov', 'etl', 'extraction'],
max_active_runs=1, # Prevent concurrent runs
)
def task_with_error_handling(**context):
"""Example task with proper error handling."""
db = DBAdapter()
try:
# Task logic here
logger.info("Starting task execution")
# Do work
result = perform_extraction()
# Log success
db.log_success(
dag_id=context['dag'].dag_id,
task_id=context['task'].task_id,
run_id=context['run_id'],
result=result
)
return result
except Exception as e:
# Log error to database
db.log_error(
dag_id=context['dag'].dag_id,
task_id=context['task'].task_id,
run_id=context['run_id'],
error=str(e)
)
logger.error(f"Task failed: {e}", exc_info=True)
raise # Re-raise to mark task as failed
finally:
db.close()
# Define tasks
extract_task = PythonOperator(
task_id='extract_data',
python_callable=task_with_error_handling,
dag=dag,
)
# Set task dependencies
extract_task
DAG Creation Patterns
Pattern 1: Sequential ETL Pipeline
Extract → Transform → Load pattern:
from airflow.operators.python import PythonOperator
def extract(**context):
"""Extract data from source."""
logger.info("Extracting data...")
# Extraction logic
return extracted_data
def transform(**context):
"""Transform extracted data."""
ti = context['ti']
data = ti.xcom_pull(task_ids='extract')
logger.info(f"Transforming {len(data)} records...")
# Transformation logic
return transformed_data
def load(**context):
"""Load data to destination."""
ti = context['ti']
data = ti.xcom_pull(task_ids='transform')
logger.info(f"Loading {len(data)} records...")
# Load logic
extract_task = PythonOperator(task_id='extract', python_callable=extract, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load, dag=dag)
extract_task >> transform_task >> load_task
Pattern 2: Dynamic Task Mapping (Airflow 2.3+)
Process multiple items in parallel:
from airflow.decorators import task
@task
def get_project_list():
"""Get list of projects to process."""
db = DBAdapter()
projects = db.get_pending_projects()
return [p['project_id'] for p in projects]
@task
def process_project(project_id):
"""Process a single project."""
logger.info(f"Processing project: {project_id}")
# Processing logic
return {"project_id": project_id, "status": "completed"}
@task
def aggregate_results(results):
"""Aggregate all project results."""
logger.info(f"Processed {len(results)} projects")
# Store aggregated results
with DAG('opengov_parallel_processing', ...) as dag:
projects = get_project_list()
results = process_project.expand(project_id=projects)
aggregate_results(results)
Pattern 3: Conditional Branching
Execute different paths based on conditions:
from airflow.operators.python import BranchPythonOperator
def check_data_freshness(**context):
"""Decide whether to skip or proceed."""
db = DBAdapter()
last_update = db.get_last_update_time()
if (datetime.now() - last_update).hours < 6:
return 'skip_extraction' # Task ID to execute
else:
return 'run_extraction'
branch_task = BranchPythonOperator(
task_id='check_freshness',
python_callable=check_data_freshness,
dag=dag,
)
skip_task = PythonOperator(
task_id='skip_extraction',
python_callable=lambda: logger.info("Skipping - data is fresh"),
dag=dag,
)
run_task = PythonOperator(
task_id='run_extraction',
python_callable=extract_data,
dag=dag,
)
branch_task >> [skip_task, run_task]
Database Wiring
Proper Database Connection Configuration
Configure database connections correctly in DAGs:
import os
from src.database.db_adapter import DBAdapter
def get_database_config():
"""Get database configuration based on environment."""
storage_target = os.getenv('DB_STORAGE_TARGET', 'sqlite')
if storage_target == 'supabase':
return {
'type': 'postgres',
'host': os.getenv('SUPABASE_DB_HOST', 'localhost'),
'port': int(os.getenv('SUPABASE_DB_PORT', 54322)),
'database': os.getenv('SUPABASE_DB_NAME', 'postgres'),
'user': os.getenv('SUPABASE_DB_USER', 'postgres'),
'password': os.getenv('SUPABASE_DB_PASSWORD'),
}
else: # sqlite
return {
'type': 'sqlite',
'database': os.getenv('SQLITE_DB_PATH', 'data/db/opengov_state.db'),
}
def task_with_db(**context):
"""Task that uses database adapter."""
config = get_database_config()
db = DBAdapter(**config)
try:
# Use database
db.execute("SELECT * FROM projects")
finally:
db.close()
Environment Variable Validation
Validate required environment variables at DAG load time:
import os
REQUIRED_ENV_VARS = [
'DB_STORAGE_TARGET',
'SUPABASE_URL',
'SQLITE_DB_PATH',
]
def validate_environment():
"""Validate required environment variables are set."""
missing = []
for var in REQUIRED_ENV_VARS:
if not os.getenv(var):
missing.append(var)
if missing:
raise EnvironmentError(
f"Missing required environment variables: {', '.join(missing)}"
)
# Validate on DAG load
validate_environment()
Error Handling and Logging
Centralized Error Reporting
Report all errors to database table:
def log_dag_error(context):
"""Callback function for task failure."""
db = DBAdapter()
error_record = {
'dag_id': context['dag'].dag_id,
'task_id': context['task_instance'].task_id,
'execution_date': context['execution_date'],
'error_message': str(context['exception']),
'traceback': context['task_instance'].log_url,
'run_id': context['run_id'],
}
db.insert('run_errors', error_record)
db.close()
# Add to default_args
default_args = {
...
'on_failure_callback': log_dag_error,
}
Comprehensive Logging
Log all significant events:
def logged_task(**context):
"""Task with comprehensive logging."""
logger.info(f"Task started: {context['task_instance'].task_id}")
logger.info(f"Run ID: {context['run_id']}")
logger.info(f"Execution date: {context['execution_date']}")
start_time = datetime.now()
try:
# Task work
result = do_work()
duration = (datetime.now() - start_time).total_seconds()
logger.info(f"Task completed in {duration}s")
return result
except Exception as e:
duration = (datetime.now() - start_time).total_seconds()
logger.error(f"Task failed after {duration}s: {e}")
raise
Monitoring and Observability
Custom Metrics
Track custom metrics for monitoring:
from airflow.metrics import Stats
def task_with_metrics(**context):
"""Task that emits custom metrics."""
start_time = time.time()
try:
# Do work
records_processed = process_records()
# Emit metrics
Stats.gauge('opengov.records.processed', records_processed)
Stats.timing('opengov.task.duration', time.time() - start_time)
Stats.incr('opengov.task.success')
except Exception as e:
Stats.incr('opengov.task.failure')
raise
Health Check DAG
Create a DAG to monitor system health:
from airflow import DAG
from airflow.operators.python import PythonOperator
def check_database_health():
"""Check database connectivity and health."""
db = DBAdapter()
try:
db.execute("SELECT 1")
return "healthy"
except Exception as e:
raise Exception(f"Database unhealthy: {e}")
finally:
db.close()
def check_supabase_health():
"""Check Supabase API health."""
import requests
response = requests.get(f"{os.getenv('SUPABASE_URL')}/rest/v1/")
if response.status_code != 200:
raise Exception(f"Supabase unhealthy: {response.status_code}")
with DAG('system_health_check', schedule_interval='*/15 * * * *', ...) as dag:
db_check = PythonOperator(task_id='check_database', python_callable=check_database_health)
supabase_check = PythonOperator(task_id='check_supabase', python_callable=check_supabase_health)
[db_check, supabase_check]
DAG Testing
Local Testing Without Airflow
Test DAG tasks independently:
# tests/test_dags.py
import pytest
from dags.opengov_extraction import extract_task, transform_task
def test_extract_task():
"""Test extract task logic."""
# Mock context
context = {
'dag': Mock(dag_id='test_dag'),
'task': Mock(task_id='test_task'),
'run_id': 'test_run',
}
# Execute task
result = extract_task.python_callable(**context)
# Assert results
assert result is not None
assert len(result) > 0
def test_dag_structure():
"""Test DAG structure and dependencies."""
from dags.opengov_extraction import dag
# Check tasks exist
assert 'extract' in dag.task_ids
assert 'transform' in dag.task_ids
assert 'load' in dag.task_ids
# Check dependencies
extract = dag.get_task('extract')
assert 'transform' in [t.task_id for t in extract.downstream_list]
Best Practices
- •Use default_args - Define common settings once
- •Implement error callbacks - Log all failures to database
- •Validate environment - Check required vars at DAG load time
- •Use task groups - Organize complex DAGs into logical groups
- •Limit concurrency - Set max_active_runs to prevent resource exhaustion
- •Enable catchup=False - Avoid backfilling unless needed
- •Tag your DAGs - Use tags for filtering and organization
- •Document everything - Clear docstrings for DAG and tasks
- •Use XCom sparingly - Don't pass large data between tasks
- •Test locally first - Always test before deploying
Common Issues
Import Errors in DAG Files
DAGs must be importable without executing tasks:
# ❌ Bad - executes on import
db = DBAdapter() # Opens connection at import time
# ✅ Good - executes at runtime
def get_db():
return DBAdapter()
Circular Dependencies
Detect and prevent circular dependencies:
# Test DAG file for errors python dags/my_dag.py # Check for circular dependencies airflow dags test opengov_workflow 2024-01-01
Memory Issues with XCom
Don't pass large datasets via XCom:
# ❌ Bad - passes large data via XCom
@task
def extract():
return large_dataset # Don't do this!
# ✅ Good - store in database/file
@task
def extract():
save_to_database(large_dataset)
return {"records": len(large_dataset), "path": "db://opengov.projects"}
Garden CLI Integration
Use Garden commands to manage DAGs:
# Create new DAG from template garden dag new my_workflow --template=etl # List all DAGs garden dag list # Monitor specific DAG garden dag monitor opengov_extraction