Fastapi Development
FastAPI project structure, Router organization, Dependency injection, Pydantic v2 models, Background tasks, WebSocket endpoints, Middleware, CORS, OpenAPI customization
FastAPI Development Skill
Build production FastAPI applications with proper structure, dependency injection, Pydantic validation, and async patterns.
FastAPI Development Skill
Build production FastAPI applications with proper structure, dependency injection, Pydantic validation, and async patterns.
Process
Step 1: Project Structure
Organize FastAPI applications using domain-driven design:
# src/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from .core.config import get_settings
from .domains.users.router import router as users_router
from .domains.products.router import router as products_router
settings = get_settings()
app = FastAPI(
title=settings.api_title,
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json"
)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(users_router)
app.include_router(products_router)
@app.get("/health")
async def health_check():
return {"status": "healthy", "version": "1.0.0"}
Step 2: Router Organization
Group related endpoints using APIRouter:
# src/domains/users/router.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List
from ..core.dependencies import get_db, get_current_user
from .schemas import UserCreate, UserResponse, UserUpdate
from .service import UserService
from .models import User
router = APIRouter(prefix="/api/v1/users", tags=["users"])
@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(
user_data: UserCreate,
db: AsyncSession = Depends(get_db)
):
"""Create a new user."""
service = UserService(db)
try:
user = await service.create_user(user_data)
return user
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
@router.get("/", response_model=List[UserResponse])
async def list_users(
skip: int = 0,
limit: int = 100,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""List all users."""
service = UserService(db)
users = await service.list_users(skip=skip, limit=limit)
return users
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(
user_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Get user by ID."""
service = UserService(db)
user = await service.get_user_by_id(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User {user_id} not found"
)
return user
@router.put("/{user_id}", response_model=UserResponse)
async def update_user(
user_id: int,
user_data: UserUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Update user."""
service = UserService(db)
user = await service.update_user(user_id, user_data)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User {user_id} not found"
)
return user
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(
user_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Delete user."""
service = UserService(db)
success = await service.delete_user(user_id)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User {user_id} not found"
)
Step 3: Dependency Injection
Use FastAPI's dependency injection for database sessions, services, and authentication:
# src/core/dependencies.py
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from jose import JWTError, jwt
from .database import get_async_session
from .config import get_settings
from ..domains.users.models import User
from sqlalchemy import select
security = HTTPBearer()
async def get_db() -> AsyncSession:
"""Dependency for database session."""
async with get_async_session() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: AsyncSession = Depends(get_db)
) -> User:
"""Dependency for authenticated user."""
settings = get_settings()
try:
payload = jwt.decode(
credentials.credentials,
settings.secret_key,
algorithms=[settings.algorithm]
)
user_id: int = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials"
)
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials"
)
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if user is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user
async def get_current_active_user(
current_user: User = Depends(get_current_user)
) -> User:
"""Dependency for active user."""
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Inactive user"
)
return current_user
Step 4: Pydantic v2 Models
Use Pydantic v2 for request/response validation:
# src/domains/users/schemas.py
from pydantic import BaseModel, EmailStr, Field, ConfigDict
from datetime import datetime
from typing import Optional
class UserBase(BaseModel):
email: EmailStr
full_name: Optional[str] = None
class UserCreate(UserBase):
password: str = Field(..., min_length=8, description="Password must be at least 8 characters")
class UserUpdate(BaseModel):
email: Optional[EmailStr] = None
full_name: Optional[str] = None
is_active: Optional[bool] = None
class UserResponse(UserBase):
id: int
is_active: bool
created_at: datetime
model_config = ConfigDict(from_attributes=True)
class UserInDB(UserResponse):
hashed_password: str
Step 5: Background Tasks
Use FastAPI's BackgroundTasks for async operations:
# src/domains/users/router.py
from fastapi import BackgroundTasks
from .service import UserService
@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(
user_data: UserCreate,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
"""Create user and send welcome email in background."""
service = UserService(db)
user = await service.create_user(user_data)
# Add background task
background_tasks.add_task(send_welcome_email, user.email)
return user
async def send_welcome_email(email: str):
"""Send welcome email (runs in background)."""
# Email sending logic here
print(f"Sending welcome email to {email}")
Step 6: WebSocket Endpoints
Implement WebSocket endpoints for real-time communication:
# src/domains/chat/router.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from typing import List
import json
router = APIRouter(prefix="/ws", tags=["websocket"])
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@router.websocket("/chat/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
message = json.dumps({
"client_id": client_id,
"message": data
})
await manager.broadcast(message)
except WebSocketDisconnect:
manager.disconnect(websocket)
Step 7: Middleware
Add custom middleware for logging, timing, and request processing:
# src/core/middleware.py
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
import time
import structlog
logger = structlog.get_logger()
class LoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start_time = time.time()
# Log request
logger.info(
"request_started",
method=request.method,
path=request.url.path,
client_host=request.client.host if request.client else None
)
response = await call_next(request)
# Log response
process_time = time.time() - start_time
logger.info(
"request_completed",
method=request.method,
path=request.url.path,
status_code=response.status_code,
process_time=process_time
)
response.headers["X-Process-Time"] = str(process_time)
return response
class RateLimitMiddleware(BaseHTTPMiddleware):
def __init__(self, app, requests_per_minute: int = 60):
super().__init__(app)
self.requests_per_minute = requests_per_minute
self.request_counts = {}
async def dispatch(self, request: Request, call_next):
client_ip = request.client.host if request.client else "unknown"
current_time = time.time()
# Clean old entries
self.request_counts = {
ip: times for ip, times in self.request_counts.items()
if any(t > current_time - 60 for t in times)
}
# Check rate limit
if client_ip in self.request_counts:
recent_requests = [t for t in self.request_counts[client_ip] if t > current_time - 60]
if len(recent_requests) >= self.requests_per_minute:
return Response(
content="Rate limit exceeded",
status_code=429
)
self.request_counts[client_ip] = recent_requests + [current_time]
else:
self.request_counts[client_ip] = [current_time]
response = await call_next(request)
return response
# In main.py
app.add_middleware(LoggingMiddleware)
app.add_middleware(RateLimitMiddleware, requests_per_minute=60)
Step 8: CORS Configuration
Configure CORS properly for production:
# src/core/config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
cors_origins: list[str] = ["http://localhost:3000"]
cors_allow_credentials: bool = True
cors_allow_methods: list[str] = ["*"]
cors_allow_headers: list[str] = ["*"]
# In main.py
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins,
allow_credentials=settings.cors_allow_credentials,
allow_methods=settings.cors_allow_methods,
allow_headers=settings.cors_allow_headers,
)
Step 9: OpenAPI Customization
Customize OpenAPI schema:
# src/main.py
from fastapi.openapi.utils import get_openapi
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema
openapi_schema = get_openapi(
title="My API",
version="1.0.0",
description="API documentation",
routes=app.routes,
)
openapi_schema["info"]["x-logo"] = {
"url": "https://fastapi.tiangolo.com/img/logo-margin/logo-teal.png"
}
app.openapi_schema = openapi_schema
return app.openapi_schema
app.openapi = custom_openapi
# src/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from .core.config import get_settings
from .domains.users.router import router as users_router
from .domains.products.router import router as products_router
settings = get_settings()
app = FastAPI(
title=settings.api_title,
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json"
)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(users_router)
app.include_router(products_router)
@app.get("/health")
async def health_check():
return {"status": "healthy", "version": "1.0.0"}
# src/domains/users/router.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List
from ..core.dependencies import get_db, get_current_user
from .schemas import UserCreate, UserResponse, UserUpdate
from .service import UserService
from .models import User
router = APIRouter(prefix="/api/v1/users", tags=["users"])
@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(
user_data: UserCreate,
db: AsyncSession = Depends(get_db)
):
"""Create a new user."""
service = UserService(db)
try:
user = await service.create_user(user_data)
return user
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
@router.get("/", response_model=List[UserResponse])
async def list_users(
skip: int = 0,
limit: int = 100,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""List all users."""
service = UserService(db)
users = await service.list_users(skip=skip, limit=limit)
return users
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(
user_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Get user by ID."""
service = UserService(db)
user = await service.get_user_by_id(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User {user_id} not found"
)
return user
@router.put("/{user_id}", response_model=UserResponse)
async def update_user(
user_id: int,
user_data: UserUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Update user."""
service = UserService(db)
user = await service.update_user(user_id, user_data)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User {user_id} not found"
)
return user
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(
user_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Delete user."""
service = UserService(db)
success = await service.delete_user(user_id)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User {user_id} not found"
)
# src/core/dependencies.py
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from jose import JWTError, jwt
from .database import get_async_session
from .config import get_settings
from ..domains.users.models import User
from sqlalchemy import select
security = HTTPBearer()
async def get_db() -> AsyncSession:
"""Dependency for database session."""
async with get_async_session() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: AsyncSession = Depends(get_db)
) -> User:
"""Dependency for authenticated user."""
settings = get_settings()
try:
payload = jwt.decode(
credentials.credentials,
settings.secret_key,
algorithms=[settings.algorithm]
)
user_id: int = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials"
)
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials"
)
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if user is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user
async def get_current_active_user(
current_user: User = Depends(get_current_user)
) -> User:
"""Dependency for active user."""
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Inactive user"
)
return current_user
# src/domains/users/schemas.py
from pydantic import BaseModel, EmailStr, Field, ConfigDict
from datetime import datetime
from typing import Optional
class UserBase(BaseModel):
email: EmailStr
full_name: Optional[str] = None
class UserCreate(UserBase):
password: str = Field(..., min_length=8, description="Password must be at least 8 characters")
class UserUpdate(BaseModel):
email: Optional[EmailStr] = None
full_name: Optional[str] = None
is_active: Optional[bool] = None
class UserResponse(UserBase):
id: int
is_active: bool
created_at: datetime
model_config = ConfigDict(from_attributes=True)
class UserInDB(UserResponse):
hashed_password: str
# src/domains/users/router.py
from fastapi import BackgroundTasks
from .service import UserService
@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(
user_data: UserCreate,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
"""Create user and send welcome email in background."""
service = UserService(db)
user = await service.create_user(user_data)
# Add background task
background_tasks.add_task(send_welcome_email, user.email)
return user
async def send_welcome_email(email: str):
"""Send welcome email (runs in background)."""
# Email sending logic here
print(f"Sending welcome email to {email}")
# src/domains/chat/router.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from typing import List
import json
router = APIRouter(prefix="/ws", tags=["websocket"])
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@router.websocket("/chat/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
message = json.dumps({
"client_id": client_id,
"message": data
})
await manager.broadcast(message)
except WebSocketDisconnect:
manager.disconnect(websocket)
# src/core/middleware.py
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
import time
import structlog
logger = structlog.get_logger()
class LoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start_time = time.time()
# Log request
logger.info(
"request_started",
method=request.method,
path=request.url.path,
client_host=request.client.host if request.client else None
)
response = await call_next(request)
# Log response
process_time = time.time() - start_time
logger.info(
"request_completed",
method=request.method,
path=request.url.path,
status_code=response.status_code,
process_time=process_time
)
response.headers["X-Process-Time"] = str(process_time)
return response
class RateLimitMiddleware(BaseHTTPMiddleware):
def __init__(self, app, requests_per_minute: int = 60):
super().__init__(app)
self.requests_per_minute = requests_per_minute
self.request_counts = {}
async def dispatch(self, request: Request, call_next):
client_ip = request.client.host if request.client else "unknown"
current_time = time.time()
# Clean old entries
self.request_counts = {
ip: times for ip, times in self.request_counts.items()
if any(t > current_time - 60 for t in times)
}
# Check rate limit
if client_ip in self.request_counts:
recent_requests = [t for t in self.request_counts[client_ip] if t > current_time - 60]
if len(recent_requests) >= self.requests_per_minute:
return Response(
content="Rate limit exceeded",
status_code=429
)
self.request_counts[client_ip] = recent_requests + [current_time]
else:
self.request_counts[client_ip] = [current_time]
response = await call_next(request)
return response
# In main.py
app.add_middleware(LoggingMiddleware)
app.add_middleware(RateLimitMiddleware, requests_per_minute=60)
# src/core/config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
cors_origins: list[str] = ["http://localhost:3000"]
cors_allow_credentials: bool = True
cors_allow_methods: list[str] = ["*"]
cors_allow_headers: list[str] = ["*"]
# In main.py
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins,
allow_credentials=settings.cors_allow_credentials,
allow_methods=settings.cors_allow_methods,
allow_headers=settings.cors_allow_headers,
)
# src/main.py
from fastapi.openapi.utils import get_openapi
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema
openapi_schema = get_openapi(
title="My API",
version="1.0.0",
description="API documentation",
routes=app.routes,
)
openapi_schema["info"]["x-logo"] = {
"url": "https://fastapi.tiangolo.com/img/logo-margin/logo-teal.png"
}
app.openapi_schema = openapi_schema
return app.openapi_schema
app.openapi = custom_openapi
Best Practices
- •Use domain-driven structure (organize by feature, not layer)
- •Keep routers thin - delegate to service layer
- •Use dependency injection for all external resources
- •Separate Pydantic models from SQLAlchemy models
- •Use async for all I/O operations
- •Implement proper error handling with exception handlers
- •Use response models to limit exposed data
- •Add proper type hints throughout
- •Use Pydantic validators for complex validation
- •Implement rate limiting for public APIs
- •Use HTTPS in production
- •Configure CORS properly
- •Add health check endpoints
- •Use structured logging
- •Document endpoints with docstrings
Anti-Patterns
| Anti-Pattern | Fix |
|---|---|
| Business logic in routers | Move to service layer |
| Synchronous database calls | Use async database drivers |
| Missing validation | Use Pydantic models |
| Global state | Use dependency injection |
| Missing error handling | Add exception handlers |
| No response models | Use response_model parameter |
Related
- •Knowledge:
knowledge/fastapi-patterns.json - •Skill:
sqlalchemy-patternsfor database access - •Skill:
python-asyncfor async patterns
Prerequisites
[!IMPORTANT] Requirements:
- •Packages: fastapi, uvicorn[standard]
- •Knowledge: fastapi-patterns.json