API Authentication Patterns
Comprehensive guide to implementing secure API authentication including JWT, OAuth 2.0, API keys, and session-based patterns. Covers when to use each approach, security best practices, and common vulnerabilities to avoid.
Quick Reference
When to use this skill:
- •Implementing API authentication
- •Choosing between auth strategies (JWT vs OAuth vs sessions)
- •Securing API endpoints
- •Implementing token refresh logic
- •Debugging authentication issues
- •Preventing auth vulnerabilities
Common triggers:
- •"How should I implement authentication"
- •"JWT vs OAuth vs API keys"
- •"How to secure this API"
- •"Implement refresh tokens"
- •"Store authentication tokens securely"
- •"Fix authentication vulnerability"
Prevents vulnerabilities:
- •Token theft and replay attacks
- •Insecure token storage
- •Missing token expiration
- •Weak password hashing
- •CSRF attacks
Part 1: Authentication Strategy Decision Matrix
When to Use Each Pattern
| Pattern | Best For | Pros | Cons |
|---|---|---|---|
| JWT | Stateless APIs, microservices, mobile apps | Stateless, scalable, works across domains | Tokens can't be revoked easily, larger payload |
| OAuth 2.0 | Third-party access, social login, delegation | Industry standard, fine-grained permissions | Complex to implement, requires authorization server |
| API Keys | Server-to-server, public APIs, rate limiting | Simple, great for service accounts | Not for users, can't be scoped easily |
| Sessions | Traditional web apps, SSR, same-domain | Revocable, server-controlled, secure | Requires server state, doesn't scale horizontally easily |
Decision Tree
START: What type of client? ├─ Mobile app or SPA? │ └─ Use JWT (stateless, works across domains) │ ├─ Third-party integration? │ └─ Use OAuth 2.0 (delegation, scoped permissions) │ ├─ Service-to-service? │ └─ Use API Keys (simple, rate-limitable) │ └─ Traditional web app (same domain)? └─ Use Sessions (revocable, server-controlled)
Part 2: JWT (JSON Web Tokens)
JWT Structure
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c [HEADER].[PAYLOAD].[SIGNATURE]
Header (algorithm and type):
{
"alg": "HS256",
"typ": "JWT"
}
Payload (claims):
{
"sub": "1234567890", // Subject (user ID)
"name": "John Doe", // Custom claim
"iat": 1516239022, // Issued at
"exp": 1516242622 // Expires at (required!)
}
Signature (verification):
HMACSHA256( base64UrlEncode(header) + "." + base64UrlEncode(payload), secret )
JWT Implementation (Python)
import jwt
import datetime
from fastapi import HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
SECRET_KEY = "your-256-bit-secret" # Must be strong, from environment
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 15
REFRESH_TOKEN_EXPIRE_DAYS = 7
security = HTTPBearer()
def create_access_token(user_id: int) -> str:
"""Create short-lived access token."""
expires = datetime.datetime.utcnow() + datetime.timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
payload = {
"sub": str(user_id),
"exp": expires,
"iat": datetime.datetime.utcnow(),
"type": "access"
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def create_refresh_token(user_id: int) -> str:
"""Create long-lived refresh token."""
expires = datetime.datetime.utcnow() + datetime.timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
payload = {
"sub": str(user_id),
"exp": expires,
"iat": datetime.datetime.utcnow(),
"type": "refresh"
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)) -> dict:
"""Verify and decode JWT token."""
token = credentials.credentials
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
if payload.get("type") != "access":
raise HTTPException(status_code=401, detail="Invalid token type")
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
# Login endpoint
@app.post("/login")
async def login(username: str, password: str):
user = authenticate_user(username, password) # Your auth logic
if not user:
raise HTTPException(status_code=401, detail="Invalid credentials")
access_token = create_access_token(user.id)
refresh_token = create_refresh_token(user.id)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}
# Protected endpoint
@app.get("/protected")
async def protected_route(payload: dict = Depends(verify_token)):
user_id = payload["sub"]
return {"message": f"Hello user {user_id}"}
# Refresh token endpoint
@app.post("/refresh")
async def refresh(refresh_token: str):
try:
payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
if payload.get("type") != "refresh":
raise HTTPException(status_code=401, detail="Invalid token type")
# Generate new access token
user_id = int(payload["sub"])
new_access_token = create_access_token(user_id)
return {"access_token": new_access_token, "token_type": "bearer"}
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Refresh token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid refresh token")
JWT Security Best Practices
✅ Do:
- •Use strong secret keys (256-bit minimum)
- •Always set expiration (
expclaim) - •Use short-lived access tokens (15 minutes)
- •Use separate refresh tokens (7 days)
- •Store secret in environment variables
- •Use HTTPS only
- •Validate signature on every request
- •Check token type (
accessvsrefresh)
❌ Don't:
- •Store sensitive data in payload (it's base64, not encrypted!)
- •Use symmetric signing (HS256) for public APIs (use RS256)
- •Store tokens in localStorage (XSS vulnerability)
- •Skip expiration validation
- •Use same token for access and refresh
- •Hard-code secrets
Token Storage (Client-Side)
❌ Bad (localStorage - vulnerable to XSS):
localStorage.setItem('token', token); // XSS can steal this!
✅ Good (httpOnly cookie):
# Server sets httpOnly cookie
response.set_cookie(
key="access_token",
value=access_token,
httponly=True, # Not accessible via JavaScript
secure=True, # HTTPS only
samesite="lax", # CSRF protection
max_age=900 # 15 minutes
)
✅ Also Good (memory only for SPAs):
// Store in memory (lost on refresh, but more secure)
let accessToken = null;
async function login(username, password) {
const response = await fetch('/login', {
method: 'POST',
body: JSON.stringify({ username, password })
});
const data = await response.json();
accessToken = data.access_token; // Store in memory
}
Part 3: OAuth 2.0
OAuth 2.0 Flows
Authorization Code Flow (most common, for web apps):
1. Client → Authorization Server: "User wants to log in" 2. Authorization Server → User: Login page 3. User → Authorization Server: Credentials 4. Authorization Server → Client: Authorization code 5. Client → Authorization Server: Exchange code for access token 6. Authorization Server → Client: Access token + refresh token
Client Credentials Flow (for service-to-service):
1. Service → Authorization Server: Client ID + Secret 2. Authorization Server → Service: Access token
OAuth 2.0 Implementation (Authorization Code Flow)
from fastapi import FastAPI, HTTPException
from authlib.integrations.starlette_client import OAuth
import os
app = FastAPI()
oauth = OAuth()
oauth.register(
name='google',
client_id=os.getenv('GOOGLE_CLIENT_ID'),
client_secret=os.getenv('GOOGLE_CLIENT_SECRET'),
server_metadata_url='https://accounts.google.com/.well-known/openid-configuration',
client_kwargs={'scope': 'openid email profile'}
)
@app.get('/login/google')
async def login_google(request: Request):
redirect_uri = request.url_for('auth_google')
return await oauth.google.authorize_redirect(request, redirect_uri)
@app.get('/auth/google')
async def auth_google(request: Request):
try:
token = await oauth.google.authorize_access_token(request)
user_info = token.get('userinfo')
# Create or update user in your database
user = get_or_create_user(
email=user_info['email'],
name=user_info['name']
)
# Create your own JWT for subsequent requests
access_token = create_access_token(user.id)
return {"access_token": access_token, "token_type": "bearer"}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
OAuth 2.0 Scopes
# Define scopes for your API
SCOPES = {
"read:posts": "Read posts",
"write:posts": "Create and edit posts",
"delete:posts": "Delete posts",
"read:profile": "Read user profile",
"write:profile": "Update user profile"
}
# Include scopes in JWT
def create_access_token(user_id: int, scopes: list[str]) -> str:
payload = {
"sub": str(user_id),
"exp": datetime.datetime.utcnow() + datetime.timedelta(minutes=15),
"scopes": scopes # Add scopes to token
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
# Check scopes in protected endpoint
def require_scopes(required_scopes: list[str]):
def decorator(func):
async def wrapper(payload: dict = Depends(verify_token)):
token_scopes = payload.get("scopes", [])
if not all(scope in token_scopes for scope in required_scopes):
raise HTTPException(status_code=403, detail="Insufficient permissions")
return await func(payload)
return wrapper
return decorator
@app.delete("/posts/{post_id}")
@require_scopes(["delete:posts"])
async def delete_post(post_id: int, payload: dict = Depends(verify_token)):
# User has delete:posts scope
pass
Part 4: API Keys
API Key Implementation
import secrets
import hashlib
from datetime import datetime
# Generate API key
def generate_api_key() -> tuple[str, str]:
"""Generate API key and return (key, hashed_key)."""
# Generate random 32-byte key
api_key = secrets.token_urlsafe(32)
# Hash for storage (never store plain key!)
hashed_key = hashlib.sha256(api_key.encode()).hexdigest()
return api_key, hashed_key
# Store in database
def create_api_key(user_id: int, name: str) -> str:
api_key, hashed_key = generate_api_key()
db.execute("""
INSERT INTO api_keys (user_id, name, key_hash, created_at)
VALUES (?, ?, ?, ?)
""", user_id, name, hashed_key, datetime.utcnow())
# Return plain key to user (only time they see it!)
return api_key
# Verify API key
def verify_api_key(api_key: str) -> dict:
"""Verify API key and return user info."""
hashed_key = hashlib.sha256(api_key.encode()).hexdigest()
result = db.execute("""
SELECT user_id, name, created_at, last_used_at
FROM api_keys
WHERE key_hash = ? AND revoked_at IS NULL
""", hashed_key).fetchone()
if not result:
raise HTTPException(status_code=401, detail="Invalid API key")
# Update last used timestamp
db.execute("""
UPDATE api_keys
SET last_used_at = ?
WHERE key_hash = ?
""", datetime.utcnow(), hashed_key)
return {"user_id": result[0], "key_name": result[1]}
# Middleware
from fastapi.security import APIKeyHeader
api_key_header = APIKeyHeader(name="X-API-Key")
@app.get("/api/data")
async def get_data(api_key: str = Security(api_key_header)):
user_info = verify_api_key(api_key)
return {"data": "protected data", "user_id": user_info["user_id"]}
API Key Best Practices
✅ Do:
- •Hash keys before storing (use SHA-256 minimum)
- •Generate cryptographically secure keys (
secretsmodule) - •Allow users to name keys ("Production Server", "CI/CD")
- •Track last used timestamp
- •Allow key revocation
- •Rate limit by API key
- •Log API key usage
❌ Don't:
- •Store plain text keys
- •Use predictable key generation
- •Expose keys in URLs (use headers)
- •Share keys across environments
API Key Revocation
@app.delete("/api-keys/{key_id}")
async def revoke_api_key(key_id: int, current_user: dict = Depends(get_current_user)):
db.execute("""
UPDATE api_keys
SET revoked_at = ?
WHERE id = ? AND user_id = ?
""", datetime.utcnow(), key_id, current_user["id"])
return {"message": "API key revoked"}
Part 5: Session-Based Authentication
Session Implementation
from fastapi import FastAPI, Cookie, Response
import redis
import secrets
import json
app = FastAPI()
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
def create_session(user_id: int) -> str:
"""Create session and return session ID."""
session_id = secrets.token_urlsafe(32)
session_data = {
"user_id": user_id,
"created_at": datetime.utcnow().isoformat()
}
# Store in Redis with 24-hour expiry
redis_client.setex(
f"session:{session_id}",
86400, # 24 hours
json.dumps(session_data)
)
return session_id
def verify_session(session_id: str) -> dict:
"""Verify session and return user data."""
session_data = redis_client.get(f"session:{session_id}")
if not session_data:
raise HTTPException(status_code=401, detail="Session expired")
return json.loads(session_data)
@app.post("/login")
async def login(username: str, password: str, response: Response):
user = authenticate_user(username, password)
if not user:
raise HTTPException(status_code=401, detail="Invalid credentials")
# Create session
session_id = create_session(user.id)
# Set httpOnly cookie
response.set_cookie(
key="session_id",
value=session_id,
httponly=True,
secure=True,
samesite="lax",
max_age=86400 # 24 hours
)
return {"message": "Logged in successfully"}
@app.get("/protected")
async def protected_route(session_id: str = Cookie(None)):
if not session_id:
raise HTTPException(status_code=401, detail="Not authenticated")
session_data = verify_session(session_id)
user_id = session_data["user_id"]
return {"message": f"Hello user {user_id}"}
@app.post("/logout")
async def logout(session_id: str = Cookie(None), response: Response):
if session_id:
redis_client.delete(f"session:{session_id}")
response.delete_cookie("session_id")
return {"message": "Logged out successfully"}
Part 6: Password Security
Password Hashing (Never Store Plain Text!)
import bcrypt
def hash_password(password: str) -> str:
"""Hash password with bcrypt."""
salt = bcrypt.gensalt(rounds=12) # 12 rounds is good balance
hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
return hashed.decode('utf-8')
def verify_password(password: str, hashed: str) -> bool:
"""Verify password against hash."""
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
# When creating user
@app.post("/register")
async def register(username: str, password: str):
# Validate password strength
if len(password) < 12:
raise HTTPException(status_code=400, detail="Password must be at least 12 characters")
# Hash password
hashed_password = hash_password(password)
# Store hashed password (NEVER plain text!)
db.execute("""
INSERT INTO users (username, password_hash)
VALUES (?, ?)
""", username, hashed_password)
return {"message": "User created"}
# When logging in
@app.post("/login")
async def login(username: str, password: str):
user = db.execute("SELECT id, password_hash FROM users WHERE username = ?", username).fetchone()
if not user:
raise HTTPException(status_code=401, detail="Invalid credentials")
# Verify password
if not verify_password(password, user[1]):
raise HTTPException(status_code=401, detail="Invalid credentials")
# Create token/session
access_token = create_access_token(user[0])
return {"access_token": access_token}
Password Requirements
Minimum requirements:
- •At least 12 characters (NIST recommendation)
- •Mix of uppercase, lowercase, numbers, symbols
- •Not in common password list
- •Not similar to username
Implementation:
import re
def validate_password(password: str, username: str) -> tuple[bool, str]:
"""Validate password strength."""
if len(password) < 12:
return False, "Password must be at least 12 characters"
if not re.search(r"[a-z]", password):
return False, "Password must contain lowercase letter"
if not re.search(r"[A-Z]", password):
return False, "Password must contain uppercase letter"
if not re.search(r"\d", password):
return False, "Password must contain number"
if not re.search(r"[!@#$%^&*(),.?\":{}|<>]", password):
return False, "Password must contain special character"
if username.lower() in password.lower():
return False, "Password cannot contain username"
# Check against common passwords
if is_common_password(password):
return False, "Password is too common"
return True, "Password is strong"
Part 7: Common Vulnerabilities
Vulnerability 1: Token Replay Attacks
Problem: Attacker intercepts token and reuses it
✅ Solution: Short expiration + refresh tokens
# Access token: 15 minutes
# Refresh token: 7 days, single-use
def refresh_tokens(refresh_token: str):
# Verify refresh token
payload = jwt.decode(refresh_token, SECRET_KEY)
# Check if already used (store used tokens in Redis)
if redis_client.get(f"used:{refresh_token}"):
raise HTTPException(status_code=401, detail="Token already used")
# Mark as used
redis_client.setex(f"used:{refresh_token}", 604800, "1") # 7 days
# Generate new tokens
user_id = int(payload["sub"])
new_access = create_access_token(user_id)
new_refresh = create_refresh_token(user_id)
return {"access_token": new_access, "refresh_token": new_refresh}
Vulnerability 2: CSRF Attacks
Problem: Attacker tricks user into making authenticated request
✅ Solution: CSRF tokens + SameSite cookies
from fastapi import Cookie, Header
def verify_csrf(
csrf_token: str = Header(None, alias="X-CSRF-Token"),
session_id: str = Cookie(None)
):
"""Verify CSRF token matches session."""
if not csrf_token:
raise HTTPException(status_code=403, detail="CSRF token missing")
session_data = verify_session(session_id)
stored_csrf = session_data.get("csrf_token")
if csrf_token != stored_csrf:
raise HTTPException(status_code=403, detail="Invalid CSRF token")
@app.post("/sensitive-action")
async def sensitive_action(
csrf_check: None = Depends(verify_csrf)
):
# Action protected from CSRF
pass
Vulnerability 3: Timing Attacks
Problem: Attacker uses response timing to guess credentials
✅ Solution: Constant-time comparison
import hmac
def constant_time_compare(a: str, b: str) -> bool:
"""Compare strings in constant time (prevents timing attacks)."""
return hmac.compare_digest(a, b)
# Use for password hash comparison
if not constant_time_compare(provided_hash, stored_hash):
raise HTTPException(status_code=401)
Part 8: Rate Limiting
from fastapi import Request
import time
# In-memory rate limiter (use Redis for production)
rate_limits = {}
def rate_limit(max_requests: int, window_seconds: int):
"""Rate limit decorator."""
def decorator(func):
async def wrapper(request: Request, *args, **kwargs):
client_ip = request.client.host
key = f"{client_ip}:{func.__name__}"
now = time.time()
if key not in rate_limits:
rate_limits[key] = []
# Remove old requests outside window
rate_limits[key] = [
req_time for req_time in rate_limits[key]
if now - req_time < window_seconds
]
# Check if over limit
if len(rate_limits[key]) >= max_requests:
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded. Try again in {window_seconds} seconds."
)
# Add current request
rate_limits[key].append(now)
return await func(request, *args, **kwargs)
return wrapper
return decorator
@app.post("/login")
@rate_limit(max_requests=5, window_seconds=60) # 5 login attempts per minute
async def login(request: Request, username: str, password: str):
# Login logic
pass
Quick Security Checklist
Token Security:
- • Always use HTTPS (never HTTP)
- • Set token expiration (15 min for access, 7 days for refresh)
- • Store secrets in environment variables
- • Hash API keys before storing
- • Use httpOnly cookies for tokens
- • Never store sensitive data in JWT payload
Password Security:
- • Use bcrypt or argon2 for hashing
- • Enforce minimum 12 characters
- • Never store plain text passwords
- • Use constant-time comparison
- • Implement rate limiting on login
API Security:
- • Validate all inputs
- • Implement rate limiting
- • Use CORS restrictions
- • Add CSRF protection for state-changing operations
- • Log authentication events
- • Monitor for suspicious patterns
Resources
JWT:
- •JWT.io: https://jwt.io/
- •RFC 7519: https://tools.ietf.org/html/rfc7519
OAuth 2.0:
- •OAuth 2.0 RFC: https://tools.ietf.org/html/rfc6749
- •OAuth 2.0 Playground: https://www.oauth.com/playground/
Security:
- •OWASP Auth Cheatsheet: https://cheatsheetseries.owasp.org/cheatsheets/Authentication_Cheat_Sheet.html
- •NIST Password Guidelines: https://pages.nist.gov/800-63-3/
Libraries:
- •PyJWT: https://pyjwt.readthedocs.io/
- •Authlib: https://docs.authlib.org/
- •Passlib: https://passlib.readthedocs.io/