AgentSkillsCN

Fastapi Patterns

Fastapi模式

SKILL.md

FastAPI Patterns Skill

Production-ready patterns for FastAPI backend development.

Project Structure

code
backend/
├── app/
│   ├── __init__.py
│   ├── main.py              # FastAPI application
│   ├── api/
│   │   ├── __init__.py
│   │   ├── deps.py          # Dependencies
│   │   └── v1/
│   │       ├── __init__.py
│   │       ├── router.py    # API router
│   │       ├── auth.py
│   │       ├── chat.py
│   │       ├── speech.py
│   │       └── users.py
│   ├── core/
│   │   ├── __init__.py
│   │   ├── config.py        # Settings
│   │   ├── database.py      # DB connection
│   │   └── security.py      # Auth utilities
│   ├── models/
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── user.py
│   │   ├── conversation.py
│   │   └── message.py
│   ├── schemas/
│   │   ├── __init__.py
│   │   ├── user.py
│   │   ├── conversation.py
│   │   └── message.py
│   ├── repositories/
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── user.py
│   │   ├── conversation.py
│   │   └── message.py
│   └── services/
│       ├── __init__.py
│       ├── auth.py
│       ├── chat.py
│       ├── ai.py
│       ├── tts.py
│       └── stt.py
├── tests/
│   ├── conftest.py
│   ├── unit/
│   └── integration/
├── alembic/
│   ├── env.py
│   └── versions/
├── requirements.txt
├── pyproject.toml
└── Dockerfile

Configuration (config.py)

python
from pydantic_settings import BaseSettings
from typing import List
from functools import lru_cache

class Settings(BaseSettings):
    # Application
    app_name: str = "Chat Bot API"
    debug: bool = False
    api_v1_prefix: str = "/api/v1"

    # Database
    database_url: str
    database_pool_size: int = 20
    database_max_overflow: int = 30

    # Security
    secret_key: str
    algorithm: str = "HS256"
    access_token_expire_minutes: int = 30

    # CORS
    allowed_origins: str = "http://localhost:3000"

    # AI Services
    anthropic_api_key: str | None = None
    openai_api_key: str | None = None
    elevenlabs_api_key: str | None = None

    # Redis
    redis_url: str = "redis://localhost:6379"

    @property
    def cors_origins(self) -> List[str]:
        return [origin.strip() for origin in self.allowed_origins.split(",")]

    class Config:
        env_file = ".env"
        env_file_encoding = "utf-8"

@lru_cache
def get_settings() -> Settings:
    return Settings()

settings = get_settings()

Database (database.py)

python
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from typing import AsyncGenerator
from app.core.config import settings

class Database:
    def __init__(self):
        self.engine = create_async_engine(
            settings.database_url,
            pool_size=settings.database_pool_size,
            max_overflow=settings.database_max_overflow,
            pool_pre_ping=True,
            echo=settings.debug
        )
        self.async_session_maker = async_sessionmaker(
            self.engine,
            class_=AsyncSession,
            expire_on_commit=False
        )

    async def get_session(self) -> AsyncGenerator[AsyncSession, None]:
        async with self.async_session_maker() as session:
            try:
                yield session
            except Exception:
                await session.rollback()
                raise

    async def close(self):
        await self.engine.dispose()

database = Database()

Base Repository (repositories/base.py)

python
from typing import Generic, TypeVar, Optional, List, Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete, func
from uuid import UUID

T = TypeVar('T')

class BaseRepository(Generic[T]):
    def __init__(self, session: AsyncSession, model: type[T]):
        self.session = session
        self.model = model

    async def get_by_id(self, id: UUID) -> Optional[T]:
        result = await self.session.execute(
            select(self.model).where(self.model.id == id)
        )
        return result.scalar_one_or_none()

    async def get_all(
        self,
        skip: int = 0,
        limit: int = 100,
        order_by: str = "created_at"
    ) -> List[T]:
        order_column = getattr(self.model, order_by, self.model.created_at)
        result = await self.session.execute(
            select(self.model)
            .order_by(order_column.desc())
            .offset(skip)
            .limit(limit)
        )
        return list(result.scalars().all())

    async def create(self, **kwargs) -> T:
        instance = self.model(**kwargs)
        self.session.add(instance)
        await self.session.commit()
        await self.session.refresh(instance)
        return instance

    async def update(self, id: UUID, **kwargs) -> Optional[T]:
        data = {k: v for k, v in kwargs.items() if v is not None}
        if not data:
            return await self.get_by_id(id)

        await self.session.execute(
            update(self.model)
            .where(self.model.id == id)
            .values(**data)
        )
        await self.session.commit()
        return await self.get_by_id(id)

    async def delete(self, id: UUID) -> bool:
        result = await self.session.execute(
            delete(self.model).where(self.model.id == id)
        )
        await self.session.commit()
        return result.rowcount > 0

    async def count(self, **filters) -> int:
        query = select(func.count(self.model.id))
        for key, value in filters.items():
            if hasattr(self.model, key):
                query = query.where(getattr(self.model, key) == value)
        result = await self.session.execute(query)
        return result.scalar() or 0

Dependencies (api/deps.py)

python
from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession
from jose import JWTError, jwt

from app.core.config import settings
from app.core.database import database
from app.models.user import User
from app.repositories.user import UserRepository

oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.api_v1_prefix}/auth/login")

async def get_db() -> AsyncSession:
    async for session in database.get_session():
        yield session

async def get_current_user(
    token: Annotated[str, Depends(oauth2_scheme)],
    db: Annotated[AsyncSession, Depends(get_db)]
) -> User:
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
        user_id: str = payload.get("sub")
        if user_id is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception

    user_repo = UserRepository(db)
    user = await user_repo.get_by_id(user_id)
    if user is None:
        raise credentials_exception
    return user

# Type aliases for cleaner endpoints
DbSession = Annotated[AsyncSession, Depends(get_db)]
CurrentUser = Annotated[User, Depends(get_current_user)]

API Endpoint (api/v1/chat.py)

python
from fastapi import APIRouter, HTTPException, status
from typing import List
from uuid import UUID

from app.api.deps import DbSession, CurrentUser
from app.schemas.message import MessageCreate, MessageResponse
from app.services.chat import ChatService

router = APIRouter(prefix="/chat", tags=["chat"])

@router.post("/messages", response_model=MessageResponse)
async def send_message(
    message: MessageCreate,
    db: DbSession,
    current_user: CurrentUser
):
    chat_service = ChatService(db)
    return await chat_service.send_message(
        user_id=current_user.id,
        content=message.content,
        conversation_id=message.conversation_id
    )

@router.get("/conversations/{conversation_id}/messages", response_model=List[MessageResponse])
async def get_messages(
    conversation_id: UUID,
    db: DbSession,
    current_user: CurrentUser,
    skip: int = 0,
    limit: int = 50
):
    chat_service = ChatService(db)
    return await chat_service.get_messages(
        conversation_id=conversation_id,
        user_id=current_user.id,
        skip=skip,
        limit=limit
    )

Service Layer (services/chat.py)

python
from sqlalchemy.ext.asyncio import AsyncSession
from uuid import UUID
from typing import List

from app.repositories.message import MessageRepository
from app.repositories.conversation import ConversationRepository
from app.models.message import Message
from app.services.ai import AIService

class ChatService:
    def __init__(self, session: AsyncSession):
        self.session = session
        self.message_repo = MessageRepository(session)
        self.conversation_repo = ConversationRepository(session)
        self.ai_service = AIService()

    async def send_message(
        self,
        user_id: UUID,
        content: str,
        conversation_id: UUID | None = None
    ) -> Message:
        # Create conversation if not exists
        if not conversation_id:
            conversation = await self.conversation_repo.create(
                user_id=user_id,
                title=content[:50]
            )
            conversation_id = conversation.id

        # Save user message
        user_message = await self.message_repo.create(
            content=content,
            role="user",
            conversation_id=conversation_id
        )

        # Get AI response
        history = await self.get_messages(conversation_id, user_id, limit=20)
        ai_response = await self.ai_service.generate_response(history)

        # Save AI message
        ai_message = await self.message_repo.create(
            content=ai_response,
            role="assistant",
            conversation_id=conversation_id
        )

        return ai_message

    async def get_messages(
        self,
        conversation_id: UUID,
        user_id: UUID,
        skip: int = 0,
        limit: int = 50
    ) -> List[Message]:
        return await self.message_repo.get_by_conversation(
            conversation_id=conversation_id,
            skip=skip,
            limit=limit
        )

Main Application (main.py)

python
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from app.core.config import settings
from app.core.database import database
from app.api.v1.router import api_router

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    yield
    # Shutdown
    await database.close()

app = FastAPI(
    title=settings.app_name,
    version="1.0.0",
    lifespan=lifespan,
    docs_url="/docs" if settings.debug else None,
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.cors_origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

app.include_router(api_router, prefix=settings.api_v1_prefix)

@app.get("/health")
async def health_check():
    return {"status": "healthy"}

requirements.txt

code
fastapi>=0.109.0
uvicorn[standard]>=0.27.0
pydantic>=2.5.0
pydantic-settings>=2.1.0
sqlalchemy[asyncio]>=2.0.0
asyncpg>=0.29.0
alembic>=1.13.0
python-jose[cryptography]>=3.3.0
passlib[bcrypt]>=1.7.4
python-multipart>=0.0.6
httpx>=0.26.0
redis>=5.0.0
anthropic>=0.18.0
openai>=1.10.0
pytest>=7.4.0
pytest-asyncio>=0.23.0