AgentSkillsCN

async-patterns

在 Python 3.13 中运用 async/await 模式,并辅以全面的错误处理与资源清理。在实现异步代码、并发操作,或与异步库协同工作时,积极运用此技能。 主动用于:异步函数、asyncio 任务、aiohttp 请求、async context managers、并发 API 调用、后台任务。 关键词:async、await、asyncio、并发、aiohttp、gather、create_task、timeout。

SKILL.md
--- frontmatter
name: async-patterns
description: |
  Async/await patterns with comprehensive error handling and resource cleanup for Python 3.13. Use this skill when implementing asynchronous code, concurrent operations, or working with async libraries.
  PROACTIVELY use for: async functions, asyncio tasks, aiohttp requests, async context managers, concurrent API calls, background tasks.
  Keywords: async, await, asyncio, concurrent, aiohttp, gather, create_task, timeout.

Async/Await Patterns with Comprehensive Error Handling

Core Principles

Async programming in Python 3.13 enables high-performance I/O-bound operations. All async code in Vibekit MUST include comprehensive error handling and proper resource cleanup.

Basic Async Functions

Every async function must handle errors properly:

python
import asyncio
from typing import Any

# ✅ REQUIRED: Comprehensive error handling in async functions
async def fetch_data(url: str, timeout: float = 10.0) -> dict[str, Any]:
    """
    Fetch data from URL with timeout and error handling.

    Raises:
        TimeoutError: If request exceeds timeout
        ConnectionError: If network request fails
        ValueError: If response is not valid JSON
    """
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=timeout)) as response:
                response.raise_for_status()  # Raises on 4xx/5xx
                return await response.json()
    except asyncio.TimeoutError:
        raise TimeoutError(f"Request to {url} timed out after {timeout}s")
    except aiohttp.ClientError as e:
        raise ConnectionError(f"Failed to fetch {url}: {e}")
    except ValueError as e:
        raise ValueError(f"Invalid JSON response from {url}: {e}")
    # Let other exceptions propagate

# ❌ FORBIDDEN: Async function without error handling
async def bad_fetch(url: str):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()  # No timeout, no error handling!

Concurrent Execution with asyncio.gather()

Run multiple async operations concurrently:

python
import asyncio
from typing import Any

# ✅ REQUIRED: Use asyncio.gather() for concurrent operations
async def fetch_multiple_urls(urls: list[str]) -> list[dict[str, Any]]:
    """
    Fetch multiple URLs concurrently.

    Returns list of results in same order as input URLs.
    Failed requests return None instead of raising.
    """
    tasks = [fetch_data(url) for url in urls]

    # return_exceptions=True prevents one failure from cancelling all tasks
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # Process results and exceptions
    processed: list[dict[str, Any]] = []
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Error fetching {urls[i]}: {result}")
            processed.append(None)
        else:
            processed.append(result)

    return processed

# Alternative: Fail-fast (any error cancels all)
async def fetch_multiple_strict(urls: list[str]) -> list[dict[str, Any]]:
    """Fetch URLs concurrently, raise on first error."""
    tasks = [fetch_data(url) for url in urls]
    return await asyncio.gather(*tasks)  # return_exceptions=False (default)

Task Management with create_task()

Control task lifecycle explicitly:

python
import asyncio
from typing import Set

# ✅ REQUIRED: Track tasks and handle cleanup
class BackgroundTaskManager:
    def __init__(self):
        self.tasks: Set[asyncio.Task] = set()

    def create_task(self, coro) -> asyncio.Task:
        """Create task and track it."""
        task = asyncio.create_task(coro)
        self.tasks.add(task)
        task.add_done_callback(self.tasks.discard)  # Auto-remove when done
        return task

    async def shutdown(self) -> None:
        """Cancel all pending tasks."""
        for task in self.tasks:
            task.cancel()

        # Wait for all cancellations to complete
        await asyncio.gather(*self.tasks, return_exceptions=True)
        self.tasks.clear()

# Usage
async def main():
    manager = BackgroundTaskManager()

    # Start background tasks
    task1 = manager.create_task(long_running_operation())
    task2 = manager.create_task(periodic_cleanup())

    try:
        # Do main work
        await some_main_task()
    finally:
        # Clean shutdown
        await manager.shutdown()

Async Context Managers

Ensure proper resource cleanup in async code:

python
import asyncio
from typing import AsyncIterator

# ✅ REQUIRED: Use async context managers for resources
class AsyncDatabaseConnection:
    """Async context manager for database connections."""

    def __init__(self, connection_string: str):
        self.connection_string = connection_string
        self.conn = None

    async def __aenter__(self):
        """Establish connection on enter."""
        try:
            self.conn = await asyncio.wait_for(
                connect_to_database(self.connection_string),
                timeout=5.0
            )
            return self.conn
        except asyncio.TimeoutError:
            raise TimeoutError("Database connection timeout")
        except Exception as e:
            raise ConnectionError(f"Failed to connect: {e}")

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Close connection on exit, even if exception occurred."""
        if self.conn is not None:
            try:
                await asyncio.wait_for(self.conn.close(), timeout=5.0)
            except Exception as e:
                print(f"Error closing connection: {e}")
        return False  # Don't suppress exceptions

# Usage
async def query_database():
    async with AsyncDatabaseConnection("postgresql://...") as conn:
        result = await conn.execute("SELECT * FROM users")
        return result

# Alternative: Using contextlib.asynccontextmanager
from contextlib import asynccontextmanager

@asynccontextmanager
async def get_db_connection(conn_str: str) -> AsyncIterator[Any]:
    """Context manager for database connections."""
    conn = None
    try:
        conn = await connect_to_database(conn_str)
        yield conn
    finally:
        if conn:
            await conn.close()

Timeouts and Cancellation

Always set timeouts for async operations:

python
import asyncio
from typing import Any

# ✅ REQUIRED: Use timeouts for all external I/O
async def fetch_with_timeout(url: str, timeout: float = 10.0) -> dict[str, Any]:
    """Fetch data with overall timeout."""
    try:
        return await asyncio.wait_for(
            fetch_data(url),
            timeout=timeout
        )
    except asyncio.TimeoutError:
        raise TimeoutError(f"Operation timed out after {timeout}s")

# ✅ REQUIRED: Handle cancellation gracefully
async def long_running_task():
    """Task that handles cancellation properly."""
    try:
        for i in range(100):
            await asyncio.sleep(1)
            print(f"Step {i}")
    except asyncio.CancelledError:
        print("Task was cancelled, cleaning up...")
        # Perform cleanup
        raise  # Re-raise to propagate cancellation

# ❌ FORBIDDEN: Async operation without timeout
async def bad_fetch():
    return await fetch_data("https://slow-api.com")  # No timeout!

Error Recovery with Retry Logic

Implement retry patterns for transient failures:

python
import asyncio
from typing import Any, Callable

# ✅ REQUIRED: Retry logic for transient errors
async def retry_async(
    func: Callable,
    *args,
    max_attempts: int = 3,
    delay: float = 1.0,
    backoff: float = 2.0,
    **kwargs
) -> Any:
    """
    Retry async function with exponential backoff.

    Args:
        func: Async function to retry
        max_attempts: Maximum number of attempts
        delay: Initial delay between retries (seconds)
        backoff: Multiplier for delay after each attempt
        *args, **kwargs: Arguments to pass to func

    Returns:
        Result of func

    Raises:
        Last exception if all attempts fail
    """
    last_exception = None
    current_delay = delay

    for attempt in range(max_attempts):
        try:
            return await func(*args, **kwargs)
        except Exception as e:
            last_exception = e
            if attempt < max_attempts - 1:
                print(f"Attempt {attempt + 1} failed: {e}. Retrying in {current_delay}s...")
                await asyncio.sleep(current_delay)
                current_delay *= backoff
            else:
                print(f"All {max_attempts} attempts failed")

    raise last_exception

# Usage
async def unreliable_api_call(endpoint: str) -> dict:
    return await fetch_data(endpoint)

result = await retry_async(
    unreliable_api_call,
    "https://api.example.com/data",
    max_attempts=5,
    delay=1.0,
    backoff=2.0
)

Structured Concurrency with TaskGroup (Python 3.11+)

Use TaskGroup for managing related tasks:

python
import asyncio

# ✅ REQUIRED: Use TaskGroup for structured concurrency
async def fetch_all_data() -> list[dict]:
    """Fetch data from multiple sources using TaskGroup."""
    results = []

    async with asyncio.TaskGroup() as tg:
        # All tasks started within the group
        task1 = tg.create_task(fetch_data("https://api1.example.com"))
        task2 = tg.create_task(fetch_data("https://api2.example.com"))
        task3 = tg.create_task(fetch_data("https://api3.example.com"))

    # At this point, all tasks have completed or an exception was raised
    results = [task1.result(), task2.result(), task3.result()]
    return results

# TaskGroup automatically:
# - Waits for all tasks to complete
# - Cancels remaining tasks if one raises an exception
# - Raises the first exception that occurred

Async Generators

Create async iterators for streaming data:

python
import asyncio
from typing import AsyncIterator

# ✅ REQUIRED: Async generators for streaming data
async def fetch_paginated_data(api_url: str) -> AsyncIterator[dict]:
    """Fetch paginated data, yielding each item."""
    page = 1
    while True:
        try:
            response = await fetch_data(f"{api_url}?page={page}")
            items = response.get("items", [])

            if not items:
                break

            for item in items:
                yield item

            page += 1
            await asyncio.sleep(0.1)  # Rate limiting
        except Exception as e:
            print(f"Error fetching page {page}: {e}")
            break

# Usage
async def process_all_items():
    async for item in fetch_paginated_data("https://api.example.com/items"):
        await process_item(item)

Rate Limiting

Implement rate limiting for API calls:

python
import asyncio
import time
from typing import Any

class RateLimiter:
    """Rate limiter for async operations."""

    def __init__(self, max_calls: int, period: float):
        """
        Initialize rate limiter.

        Args:
            max_calls: Maximum number of calls allowed
            period: Time period in seconds
        """
        self.max_calls = max_calls
        self.period = period
        self.calls: list[float] = []
        self.lock = asyncio.Lock()

    async def acquire(self) -> None:
        """Wait until a call slot is available."""
        async with self.lock:
            now = time.time()

            # Remove old calls outside the window
            self.calls = [t for t in self.calls if now - t < self.period]

            if len(self.calls) >= self.max_calls:
                # Wait until the oldest call expires
                sleep_time = self.period - (now - self.calls[0])
                await asyncio.sleep(sleep_time)
                self.calls.pop(0)

            self.calls.append(time.time())

# Usage
limiter = RateLimiter(max_calls=10, period=60.0)  # 10 calls per minute

async def rate_limited_fetch(url: str) -> dict:
    await limiter.acquire()
    return await fetch_data(url)

Anti-Patterns to Avoid

❌ No Error Handling in Async Functions

python
# BAD: Silent failures
async def bad_fetch(url: str):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

# GOOD: Explicit error handling
async def good_fetch(url: str) -> dict:
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                response.raise_for_status()
                return await response.json()
    except aiohttp.ClientError as e:
        raise ConnectionError(f"Failed to fetch {url}: {e}")

❌ No Timeout on Async Operations

python
# BAD: Can hang forever
result = await fetch_data(url)

# GOOD: Always use timeout
result = await asyncio.wait_for(fetch_data(url), timeout=10.0)

❌ Not Re-raising CancelledError

python
# BAD: Suppresses cancellation
async def bad_task():
    try:
        await long_operation()
    except asyncio.CancelledError:
        print("Cancelled")
        # Missing: raise

# GOOD: Re-raise to propagate cancellation
async def good_task():
    try:
        await long_operation()
    except asyncio.CancelledError:
        print("Cancelled, cleaning up...")
        # Cleanup
        raise  # Re-raise

❌ Using Blocking Code in Async Functions

python
# BAD: Blocks the event loop
async def bad_async():
    time.sleep(10)  # Blocking!
    return "done"

# GOOD: Use async sleep
async def good_async():
    await asyncio.sleep(10)  # Non-blocking
    return "done"

❌ Not Tracking Background Tasks

python
# BAD: Task might be garbage collected
async def bad_background():
    asyncio.create_task(background_work())  # Fire and forget - dangerous!

# GOOD: Track the task
async def good_background():
    task = asyncio.create_task(background_work())
    # Store task reference or await it later
    return task

When to Use This Skill

Activate this skill when:

  • Implementing async API clients or server handlers
  • Creating concurrent data fetching operations
  • Building async context managers
  • Implementing retry logic for async operations
  • Working with async generators or streaming data
  • Managing background tasks with proper lifecycle

Related Resources

For additional patterns, see: