AgentSkillsCN

Error Handling

采用韧性策略、回退模式、熔断器与优雅降级机制进行重试

SKILL.md
--- frontmatter
description: Retry strategies with tenacity, fallback patterns, circuit breakers, graceful degradation

Error Handling

Retry strategies with tenacity, fallback patterns, circuit breakers, graceful degradation

Error Handling Skill

Implement robust error handling patterns - retry strategies, fallbacks, circuit breakers, and graceful degradation for resilient agent systems.

Error Handling Skill

Implement robust error handling patterns - retry strategies, fallbacks, circuit breakers, and graceful degradation for resilient agent systems.

Process

Step 1: Basic Retry with Tenacity

Implement retry logic for transient failures:

python
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
    before_sleep_log,
    after_log
)
import logging
from langchain_openai import ChatOpenAI
import httpx

logger = logging.getLogger(__name__)

# Basic retry decorator
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10),
    retry=retry_if_exception_type(Exception),
    before_sleep=before_sleep_log(logger, logging.WARNING),
    after=after_log(logger, logging.INFO)
)
async def unreliable_api_call(endpoint: str) -> str:
    """API call with automatic retry."""
    async with httpx.AsyncClient(timeout=30) as client:
        response = await client.get(endpoint)
        response.raise_for_status()
        return response.text

# Retry with specific exceptions
from httpx import HTTPStatusError, TimeoutException

@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=2, min=1, max=60),
    retry=retry_if_exception_type((HTTPStatusError, TimeoutException)),
    reraise=True
)
async def robust_api_call(url: str) -> dict:
    """Retry on specific exceptions only."""
    async with httpx.AsyncClient(timeout=10) as client:
        response = await client.get(url)
        response.raise_for_status()
        return response.json()

# Retry LLM calls
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10),
    retry=retry_if_exception_type(Exception)
)
async def retryable_llm_call(prompt: str) -> str:
    """LLM call with retry."""
    llm = ChatOpenAI(model="gpt-4", temperature=0.7)
    response = await llm.ainvoke(prompt)
    return response.content

Step 2: Advanced Retry Strategies

Custom retry conditions and backoff strategies:

python
from tenacity import (
    retry,
    stop_after_delay,
    wait_fixed,
    wait_random,
    wait_chain,
    retry_if_result,
    retry_if_exception_message
)

# Retry until timeout
@retry(
    stop=stop_after_delay(30),  # Stop after 30 seconds
    wait=wait_fixed(2)
)
async def retry_until_timeout():
    """Retry until time limit."""
    # Implementation
    pass

# Random wait between retries
@retry(
    stop=stop_after_attempt(5),
    wait=wait_random(min=1, max=5)  # Random 1-5 seconds
)
async def retry_with_random_wait():
    """Retry with random wait."""
    pass

# Chained wait strategy
@retry(
    stop=stop_after_attempt(5),
    wait=wait_chain(
        wait_fixed(1) * 3,  # Wait 1s for first 3 attempts
        wait_exponential(multiplier=2, min=2, max=10)  # Then exponential
    )
)
async def retry_with_chained_wait():
    """Retry with chained wait strategy."""
    pass

# Retry based on result
@retry(
    stop=stop_after_attempt(3),
    retry=retry_if_result(lambda result: result is None or result == "")
)
async def retry_on_empty_result():
    """Retry if result is empty."""
    result = await some_operation()
    return result

# Retry based on exception message
@retry(
    stop=stop_after_attempt(3),
    retry=retry_if_exception_message(match=".*rate limit.*")
)
async def retry_on_rate_limit():
    """Retry only on rate limit errors."""
    # Implementation
    pass

Step 3: Fallback Patterns

Implement fallback mechanisms:

python
from typing import Callable, List, Any
import asyncio

class FallbackChain:
    """Execute functions in order until one succeeds."""
    
    def __init__(self, fallbacks: List[Callable]):
        self.fallbacks = fallbacks
    
    async def execute(self, *args, **kwargs) -> Any:
        """Execute fallbacks in order."""
        last_error = None
        
        for i, fallback in enumerate(self.fallbacks):
            try:
                if asyncio.iscoroutinefunction(fallback):
                    result = await fallback(*args, **kwargs)
                else:
                    result = fallback(*args, **kwargs)
                
                # Success - return result
                return result
            except Exception as e:
                last_error = e
                logger.warning(f"Fallback {i} failed: {e}")
                continue
        
        # All fallbacks failed
        raise Exception(f"All fallbacks failed. Last error: {last_error}")

# Usage with multiple LLM providers
async def primary_llm(prompt: str) -> str:
    """Primary LLM provider."""
    llm = ChatOpenAI(model="gpt-4")
    response = await llm.ainvoke(prompt)
    return response.content

async def fallback_llm(prompt: str) -> str:
    """Fallback LLM provider."""
    from langchain_anthropic import ChatAnthropic
    llm = ChatAnthropic(model="claude-3-5-sonnet")
    response = await llm.ainvoke(prompt)
    return response.content

async def local_fallback(prompt: str) -> str:
    """Local fallback (e.g., cached response)."""
    return "Default response from cache"

# Create fallback chain
llm_chain = FallbackChain([primary_llm, fallback_llm, local_fallback])

# Use fallback chain
try:
    result = await llm_chain.execute("What is Python?")
except Exception as e:
    logger.error(f"All LLM providers failed: {e}")

# Fallback with default value
async def safe_llm_call(prompt: str, default: str = "Unable to generate response") -> str:
    """LLM call with default fallback."""
    try:
        llm = ChatOpenAI(model="gpt-4")
        response = await llm.ainvoke(prompt)
        return response.content
    except Exception as e:
        logger.error(f"LLM call failed: {e}")
        return default

Step 4: Circuit Breaker Pattern

Implement circuit breaker to prevent cascading failures:

python
from enum import Enum
from datetime import datetime
from typing import Callable, Optional
import asyncio

class CircuitState(Enum):
    CLOSED = "closed"  # Normal operation
    OPEN = "open"  # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing if service recovered

class CircuitBreaker:
    """Circuit breaker pattern implementation."""
    
    def __init__(
        self,
        failure_threshold: int = 5,
        timeout: int = 60,
        expected_exception: type = Exception
    ):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.expected_exception = expected_exception
        
        self.failure_count = 0
        self.last_failure_time: Optional[datetime] = None
        self.state = CircuitState.CLOSED
    
    def call(self, func: Callable, *args, **kwargs):
        """Execute function with circuit breaker protection."""
        if self.state == CircuitState.OPEN:
            # Check if timeout has passed
            if self.last_failure_time:
                elapsed = (datetime.now() - self.last_failure_time).total_seconds()
                if elapsed >= self.timeout:
                    self.state = CircuitState.HALF_OPEN
                    self.failure_count = 0
                else:
                    raise Exception("Circuit breaker is OPEN")
        
        # Execute function
        try:
            if asyncio.iscoroutinefunction(func):
                result = asyncio.run(func(*args, **kwargs))
            else:
                result = func(*args, **kwargs)
            
            # Success - reset if half-open
            if self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
            
            return result
        
        except self.expected_exception as e:
            self.failure_count += 1
            self.last_failure_time = datetime.now()
            
            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN
            
            raise e
    
    async def acall(self, func: Callable, *args, **kwargs):
        """Async version of call."""
        if self.state == CircuitState.OPEN:
            if self.last_failure_time:
                elapsed = (datetime.now() - self.last_failure_time).total_seconds()
                if elapsed >= self.timeout:
                    self.state = CircuitState.HALF_OPEN
                    self.failure_count = 0
                else:
                    raise Exception("Circuit breaker is OPEN")
        
        try:
            if asyncio.iscoroutinefunction(func):
                result = await func(*args, **kwargs)
            else:
                result = func(*args, **kwargs)
            
            if self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
            
            return result
        
        except self.expected_exception as e:
            self.failure_count += 1
            self.last_failure_time = datetime.now()
            
            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN
            
            raise e

# Usage
circuit_breaker = CircuitBreaker(
    failure_threshold=5,
    timeout=60,
    expected_exception=Exception
)

async def protected_api_call(url: str) -> dict:
    """API call protected by circuit breaker."""
    async def _call():
        async with httpx.AsyncClient(timeout=10) as client:
            response = await client.get(url)
            response.raise_for_status()
            return response.json()
    
    return await circuit_breaker.acall(_call)

Step 5: Graceful Degradation

Implement graceful degradation for partial failures:

python
from typing import Dict, List
from pydantic import BaseModel

class DegradedResponse(BaseModel):
    """Response with degradation metadata."""
    data: Dict
    degraded_features: List[str]
    full_response: bool

class GracefulDegradation:
    """Implement graceful degradation patterns."""
    
    async def get_user_profile(self, user_id: str) -> DegradedResponse:
        """Get user profile with graceful degradation."""
        degraded_features = []
        profile = {}
        
        # Try to get full profile
        try:
            full_profile = await self._fetch_full_profile(user_id)
            return DegradedResponse(
                data=full_profile,
                degraded_features=[],
                full_response=True
            )
        except Exception as e:
            logger.warning(f"Full profile fetch failed: {e}")
        
        # Degrade to cached profile
        try:
            cached_profile = await self._get_cached_profile(user_id)
            degraded_features.append("real_time_data")
            profile.update(cached_profile)
        except Exception as e:
            logger.warning(f"Cached profile fetch failed: {e}")
            degraded_features.append("cached_data")
        
        # Degrade to minimal profile
        try:
            minimal_profile = await self._get_minimal_profile(user_id)
            profile.update(minimal_profile)
        except Exception as e:
            logger.error(f"Minimal profile fetch failed: {e}")
            # Return empty profile as last resort
            profile = {"user_id": user_id, "status": "degraded"}
        
        return DegradedResponse(
            data=profile,
            degraded_features=degraded_features,
            full_response=False
        )
    
    async def _fetch_full_profile(self, user_id: str) -> Dict:
        """Fetch full profile from primary source."""
        # Implementation
        pass
    
    async def _get_cached_profile(self, user_id: str) -> Dict:
        """Get cached profile."""
        # Implementation
        pass
    
    async def _get_minimal_profile(self, user_id: str) -> Dict:
        """Get minimal profile from fallback."""
        # Implementation
        pass

# Graceful degradation for LLM features
class LLMDegradation:
    """Graceful degradation for LLM features."""
    
    async def generate_response(self, prompt: str) -> str:
        """Generate response with feature degradation."""
        # Try full-featured generation
        try:
            return await self._full_generation(prompt)
        except Exception as e:
            logger.warning(f"Full generation failed: {e}")
        
        # Degrade to simpler model
        try:
            return await self._simple_generation(prompt)
        except Exception as e:
            logger.warning(f"Simple generation failed: {e}")
        
        # Degrade to template-based response
        return self._template_response(prompt)
    
    async def _full_generation(self, prompt: str) -> str:
        """Full-featured generation."""
        llm = ChatOpenAI(model="gpt-4", temperature=0.7)
        response = await llm.ainvoke(prompt)
        return response.content
    
    async def _simple_generation(self, prompt: str) -> str:
        """Simpler model generation."""
        llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7)
        response = await llm.ainvoke(prompt)
        return response.content
    
    def _template_response(self, prompt: str) -> str:
        """Template-based fallback."""
        return f"I apologize, but I'm experiencing technical difficulties. Your query was: {prompt[:50]}..."

Step 6: Error Recovery and Compensation

Implement recovery and compensation patterns:

python
from typing import Callable, List

class CompensationAction:
    """Represents a compensation action for rollback."""
    
    def __init__(self, action: Callable, *args, **kwargs):
        self.action = action
        self.args = args
        self.kwargs = kwargs
    
    async def execute(self):
        """Execute compensation."""
        if asyncio.iscoroutinefunction(self.action):
            await self.action(*self.args, **self.kwargs)
        else:
            self.action(*self.args, **self.kwargs)

class TransactionalOperation:
    """Transactional operation with compensation."""
    
    def __init__(self):
        self.actions: List[Callable] = []
        self.compensations: List[CompensationAction] = []
    
    def add_action(self, action: Callable, compensation: Callable = None, *args, **kwargs):
        """Add action with optional compensation."""
        self.actions.append(action)
        if compensation:
            self.compensations.append(CompensationAction(compensation, *args, **kwargs))
    
    async def execute(self) -> bool:
        """Execute all actions, compensating on failure."""
        executed = []
        
        try:
            for i, action in enumerate(self.actions):
                if asyncio.iscoroutinefunction(action):
                    await action()
                else:
                    action()
                executed.append(i)
            
            return True
        
        except Exception as e:
            logger.error(f"Operation failed: {e}")
            # Compensate in reverse order
            for i in reversed(executed):
                if i < len(self.compensations):
                    try:
                        await self.compensations[i].execute()
                    except Exception as comp_error:
                        logger.error(f"Compensation failed: {comp_error}")
            
            return False

# Usage
async def create_user_profile(user_id: str, data: dict):
    """Create user profile."""
    # Implementation
    pass

async def delete_user_profile(user_id: str):
    """Delete user profile (compensation)."""
    # Implementation
    pass

transaction = TransactionalOperation()
transaction.add_action(
    lambda: create_user_profile("user_123", {}),
    compensation=lambda: delete_user_profile("user_123")
)

success = await transaction.execute()
python
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
    before_sleep_log,
    after_log
)
import logging
from langchain_openai import ChatOpenAI
import httpx

logger = logging.getLogger(__name__)

# Basic retry decorator
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10),
    retry=retry_if_exception_type(Exception),
    before_sleep=before_sleep_log(logger, logging.WARNING),
    after=after_log(logger, logging.INFO)
)
async def unreliable_api_call(endpoint: str) -> str:
    """API call with automatic retry."""
    async with httpx.AsyncClient(timeout=30) as client:
        response = await client.get(endpoint)
        response.raise_for_status()
        return response.text

# Retry with specific exceptions
from httpx import HTTPStatusError, TimeoutException

@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=2, min=1, max=60),
    retry=retry_if_exception_type((HTTPStatusError, TimeoutException)),
    reraise=True
)
async def robust_api_call(url: str) -> dict:
    """Retry on specific exceptions only."""
    async with httpx.AsyncClient(timeout=10) as client:
        response = await client.get(url)
        response.raise_for_status()
        return response.json()

# Retry LLM calls
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10),
    retry=retry_if_exception_type(Exception)
)
async def retryable_llm_call(prompt: str) -> str:
    """LLM call with retry."""
    llm = ChatOpenAI(model="gpt-4", temperature=0.7)
    response = await llm.ainvoke(prompt)
    return response.content
python
from tenacity import (
    retry,
    stop_after_delay,
    wait_fixed,
    wait_random,
    wait_chain,
    retry_if_result,
    retry_if_exception_message
)

# Retry until timeout
@retry(
    stop=stop_after_delay(30),  # Stop after 30 seconds
    wait=wait_fixed(2)
)
async def retry_until_timeout():
    """Retry until time limit."""
    # Implementation
    pass

# Random wait between retries
@retry(
    stop=stop_after_attempt(5),
    wait=wait_random(min=1, max=5)  # Random 1-5 seconds
)
async def retry_with_random_wait():
    """Retry with random wait."""
    pass

# Chained wait strategy
@retry(
    stop=stop_after_attempt(5),
    wait=wait_chain(
        wait_fixed(1) * 3,  # Wait 1s for first 3 attempts
        wait_exponential(multiplier=2, min=2, max=10)  # Then exponential
    )
)
async def retry_with_chained_wait():
    """Retry with chained wait strategy."""
    pass

# Retry based on result
@retry(
    stop=stop_after_attempt(3),
    retry=retry_if_result(lambda result: result is None or result == "")
)
async def retry_on_empty_result():
    """Retry if result is empty."""
    result = await some_operation()
    return result

# Retry based on exception message
@retry(
    stop=stop_after_attempt(3),
    retry=retry_if_exception_message(match=".*rate limit.*")
)
async def retry_on_rate_limit():
    """Retry only on rate limit errors."""
    # Implementation
    pass
python
from typing import Callable, List, Any
import asyncio

class FallbackChain:
    """Execute functions in order until one succeeds."""
    
    def __init__(self, fallbacks: List[Callable]):
        self.fallbacks = fallbacks
    
    async def execute(self, *args, **kwargs) -> Any:
        """Execute fallbacks in order."""
        last_error = None
        
        for i, fallback in enumerate(self.fallbacks):
            try:
                if asyncio.iscoroutinefunction(fallback):
                    result = await fallback(*args, **kwargs)
                else:
                    result = fallback(*args, **kwargs)
                
                # Success - return result
                return result
            except Exception as e:
                last_error = e
                logger.warning(f"Fallback {i} failed: {e}")
                continue
        
        # All fallbacks failed
        raise Exception(f"All fallbacks failed. Last error: {last_error}")

# Usage with multiple LLM providers
async def primary_llm(prompt: str) -> str:
    """Primary LLM provider."""
    llm = ChatOpenAI(model="gpt-4")
    response = await llm.ainvoke(prompt)
    return response.content

async def fallback_llm(prompt: str) -> str:
    """Fallback LLM provider."""
    from langchain_anthropic import ChatAnthropic
    llm = ChatAnthropic(model="claude-3-5-sonnet")
    response = await llm.ainvoke(prompt)
    return response.content

async def local_fallback(prompt: str) -> str:
    """Local fallback (e.g., cached response)."""
    return "Default response from cache"

# Create fallback chain
llm_chain = FallbackChain([primary_llm, fallback_llm, local_fallback])

# Use fallback chain
try:
    result = await llm_chain.execute("What is Python?")
except Exception as e:
    logger.error(f"All LLM providers failed: {e}")

# Fallback with default value
async def safe_llm_call(prompt: str, default: str = "Unable to generate response") -> str:
    """LLM call with default fallback."""
    try:
        llm = ChatOpenAI(model="gpt-4")
        response = await llm.ainvoke(prompt)
        return response.content
    except Exception as e:
        logger.error(f"LLM call failed: {e}")
        return default
python
from enum import Enum
from datetime import datetime
from typing import Callable, Optional
import asyncio

class CircuitState(Enum):
    CLOSED = "closed"  # Normal operation
    OPEN = "open"  # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing if service recovered

class CircuitBreaker:
    """Circuit breaker pattern implementation."""
    
    def __init__(
        self,
        failure_threshold: int = 5,
        timeout: int = 60,
        expected_exception: type = Exception
    ):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.expected_exception = expected_exception
        
        self.failure_count = 0
        self.last_failure_time: Optional[datetime] = None
        self.state = CircuitState.CLOSED
    
    def call(self, func: Callable, *args, **kwargs):
        """Execute function with circuit breaker protection."""
        if self.state == CircuitState.OPEN:
            # Check if timeout has passed
            if self.last_failure_time:
                elapsed = (datetime.now() - self.last_failure_time).total_seconds()
                if elapsed >= self.timeout:
                    self.state = CircuitState.HALF_OPEN
                    self.failure_count = 0
                else:
                    raise Exception("Circuit breaker is OPEN")
        
        # Execute function
        try:
            if asyncio.iscoroutinefunction(func):
                result = asyncio.run(func(*args, **kwargs))
            else:
                result = func(*args, **kwargs)
            
            # Success - reset if half-open
            if self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
            
            return result
        
        except self.expected_exception as e:
            self.failure_count += 1
            self.last_failure_time = datetime.now()
            
            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN
            
            raise e
    
    async def acall(self, func: Callable, *args, **kwargs):
        """Async version of call."""
        if self.state == CircuitState.OPEN:
            if self.last_failure_time:
                elapsed = (datetime.now() - self.last_failure_time).total_seconds()
                if elapsed >= self.timeout:
                    self.state = CircuitState.HALF_OPEN
                    self.failure_count = 0
                else:
                    raise Exception("Circuit breaker is OPEN")
        
        try:
            if asyncio.iscoroutinefunction(func):
                result = await func(*args, **kwargs)
            else:
                result = func(*args, **kwargs)
            
            if self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
            
            return result
        
        except self.expected_exception as e:
            self.failure_count += 1
            self.last_failure_time = datetime.now()
            
            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN
            
            raise e

# Usage
circuit_breaker = CircuitBreaker(
    failure_threshold=5,
    timeout=60,
    expected_exception=Exception
)

async def protected_api_call(url: str) -> dict:
    """API call protected by circuit breaker."""
    async def _call():
        async with httpx.AsyncClient(timeout=10) as client:
            response = await client.get(url)
            response.raise_for_status()
            return response.json()
    
    return await circuit_breaker.acall(_call)
python
from typing import Dict, List
from pydantic import BaseModel

class DegradedResponse(BaseModel):
    """Response with degradation metadata."""
    data: Dict
    degraded_features: List[str]
    full_response: bool

class GracefulDegradation:
    """Implement graceful degradation patterns."""
    
    async def get_user_profile(self, user_id: str) -> DegradedResponse:
        """Get user profile with graceful degradation."""
        degraded_features = []
        profile = {}
        
        # Try to get full profile
        try:
            full_profile = await self._fetch_full_profile(user_id)
            return DegradedResponse(
                data=full_profile,
                degraded_features=[],
                full_response=True
            )
        except Exception as e:
            logger.warning(f"Full profile fetch failed: {e}")
        
        # Degrade to cached profile
        try:
            cached_profile = await self._get_cached_profile(user_id)
            degraded_features.append("real_time_data")
            profile.update(cached_profile)
        except Exception as e:
            logger.warning(f"Cached profile fetch failed: {e}")
            degraded_features.append("cached_data")
        
        # Degrade to minimal profile
        try:
            minimal_profile = await self._get_minimal_profile(user_id)
            profile.update(minimal_profile)
        except Exception as e:
            logger.error(f"Minimal profile fetch failed: {e}")
            # Return empty profile as last resort
            profile = {"user_id": user_id, "status": "degraded"}
        
        return DegradedResponse(
            data=profile,
            degraded_features=degraded_features,
            full_response=False
        )
    
    async def _fetch_full_profile(self, user_id: str) -> Dict:
        """Fetch full profile from primary source."""
        # Implementation
        pass
    
    async def _get_cached_profile(self, user_id: str) -> Dict:
        """Get cached profile."""
        # Implementation
        pass
    
    async def _get_minimal_profile(self, user_id: str) -> Dict:
        """Get minimal profile from fallback."""
        # Implementation
        pass

# Graceful degradation for LLM features
class LLMDegradation:
    """Graceful degradation for LLM features."""
    
    async def generate_response(self, prompt: str) -> str:
        """Generate response with feature degradation."""
        # Try full-featured generation
        try:
            return await self._full_generation(prompt)
        except Exception as e:
            logger.warning(f"Full generation failed: {e}")
        
        # Degrade to simpler model
        try:
            return await self._simple_generation(prompt)
        except Exception as e:
            logger.warning(f"Simple generation failed: {e}")
        
        # Degrade to template-based response
        return self._template_response(prompt)
    
    async def _full_generation(self, prompt: str) -> str:
        """Full-featured generation."""
        llm = ChatOpenAI(model="gpt-4", temperature=0.7)
        response = await llm.ainvoke(prompt)
        return response.content
    
    async def _simple_generation(self, prompt: str) -> str:
        """Simpler model generation."""
        llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7)
        response = await llm.ainvoke(prompt)
        return response.content
    
    def _template_response(self, prompt: str) -> str:
        """Template-based fallback."""
        return f"I apologize, but I'm experiencing technical difficulties. Your query was: {prompt[:50]}..."
python
from typing import Callable, List

class CompensationAction:
    """Represents a compensation action for rollback."""
    
    def __init__(self, action: Callable, *args, **kwargs):
        self.action = action
        self.args = args
        self.kwargs = kwargs
    
    async def execute(self):
        """Execute compensation."""
        if asyncio.iscoroutinefunction(self.action):
            await self.action(*self.args, **self.kwargs)
        else:
            self.action(*self.args, **self.kwargs)

class TransactionalOperation:
    """Transactional operation with compensation."""
    
    def __init__(self):
        self.actions: List[Callable] = []
        self.compensations: List[CompensationAction] = []
    
    def add_action(self, action: Callable, compensation: Callable = None, *args, **kwargs):
        """Add action with optional compensation."""
        self.actions.append(action)
        if compensation:
            self.compensations.append(CompensationAction(compensation, *args, **kwargs))
    
    async def execute(self) -> bool:
        """Execute all actions, compensating on failure."""
        executed = []
        
        try:
            for i, action in enumerate(self.actions):
                if asyncio.iscoroutinefunction(action):
                    await action()
                else:
                    action()
                executed.append(i)
            
            return True
        
        except Exception as e:
            logger.error(f"Operation failed: {e}")
            # Compensate in reverse order
            for i in reversed(executed):
                if i < len(self.compensations):
                    try:
                        await self.compensations[i].execute()
                    except Exception as comp_error:
                        logger.error(f"Compensation failed: {comp_error}")
            
            return False

# Usage
async def create_user_profile(user_id: str, data: dict):
    """Create user profile."""
    # Implementation
    pass

async def delete_user_profile(user_id: str):
    """Delete user profile (compensation)."""
    # Implementation
    pass

transaction = TransactionalOperation()
transaction.add_action(
    lambda: create_user_profile("user_123", {}),
    compensation=lambda: delete_user_profile("user_123")
)

success = await transaction.execute()

Error Handling Patterns

PatternUse CaseImplementation
RetryTransient failuresTenacity decorators
FallbackService unavailabilityFallbackChain
Circuit BreakerCascading failuresCircuitBreaker class
Graceful DegradationPartial failuresFeature degradation
CompensationTransaction rollbackCompensation actions
TimeoutLong-running operationsasyncio.timeout

Best Practices

  • Use exponential backoff for retries
  • Set appropriate retry limits
  • Implement circuit breakers for external services
  • Provide meaningful fallbacks
  • Log all errors for debugging
  • Monitor error rates
  • Use timeouts for all I/O operations
  • Implement graceful degradation
  • Test error scenarios
  • Document error handling strategies

Anti-Patterns

Anti-PatternFix
Infinite retriesSet retry limits
No backoffUse exponential backoff
Ignoring errorsHandle and log all errors
No timeoutsSet timeouts for I/O
No fallbacksImplement fallback chains
Retrying non-retryable errorsCheck error types
No circuit breakersAdd circuit breakers
Silent failuresLog and report errors
No compensationImplement rollback logic
Ignoring rate limitsHandle rate limit errors

Related

  • Skill: caching-optimization
  • Skill: tool-usage
  • Skill: langchain-usage

Prerequisites

[!IMPORTANT] Requirements:

  • Packages: tenacity, langchain-core, langchain-openai, httpx