This guide explains how to integrate FastAPI, MCP, and other backend services with the RoboLearn Auth Server using OAuth 2.0/OIDC.
- Registering as Confidential Client
- OAuth Authorization Code Flow
- Verifying Tokens (JWKS)
- Enforcing Authorization
- Client Credentials Grant (Machine-to-Machine) 5b. API Key Authentication (Machine-to-Machine)
- Production Considerations
Backend services should register as confidential clients (services that can securely store a client secret).
Endpoint: POST /api/admin/clients/register
Authentication: Requires admin session (admin user must be logged in)
Request:
curl -X POST http://localhost:3001/api/admin/clients/register \
-H "Content-Type: application/json" \
-H "Cookie: robolearn_session=<admin-session-cookie>" \
-d '{
"name": "My Backend Service",
"redirectUrls": ["https://api.example.com/auth/callback"],
"scope": "openid profile email",
"clientType": "confidential",
"skipConsent": false
}'Response:
{
"success": true,
"client_id": "generated-client-id",
"client_secret": "generated-client-secret",
"client_type": "confidential",
"name": "My Backend Service",
"redirect_uris": ["https://api.example.com/auth/callback"]
}- Store
client_secretsecurely (environment variable, secrets manager, never in source code) - The client secret is only shown once during registration
- Admin authentication is required to prevent unauthorized client registration
After registration, configure your backend service:
# OAuth Configuration
OAUTH_CLIENT_ID=generated-client-id
OAUTH_CLIENT_SECRET=generated-client-secret
OAUTH_AUTH_URL=http://localhost:3001
OAUTH_REDIRECT_URI=https://api.example.com/auth/callback
OAUTH_ISSUER=http://localhost:3001| Type | Client Secret | PKCE | Use Case |
|---|---|---|---|
| Public | ❌ No secret | ✅ Required | SPAs, mobile apps, CLI tools |
| Confidential | ✅ Has secret | Backend services, server-side apps |
The Authorization Code Flow is the standard OAuth 2.0 flow for applications that can securely store credentials.
┌─────────────┐ ┌──────────────┐ ┌─────────────────┐
│ User │────▶│ Auth Server │────▶│ Your Backend │
│ (Browser) │ │ (Better Auth)│ │ (FastAPI) │
└─────────────┘ └──────────────┘ └─────────────────┘
│ │ │
│ 1. Redirect │ │
│ /authorize │ │
│───────────────────▶│ │
│ │ 2. User Login │
│ │ (if needed) │
│ │ │
│ 3. Redirect back │ │
│ with code │ │
│◀───────────────────│ │
│ │ │
│ 4. Send code to │ │
│ backend │ │
│────────────────────────────────────────▶│
│ │ │
│ │ 5. Exchange code │
│ │ + client_secret │
│ │◀─────────────────────│
│ │ │
│ │ 6. Return tokens │
│ │─────────────────────▶│
│ │ │
When a user needs to authenticate, redirect them to:
GET /api/auth/oauth2/authorize?
client_id={CLIENT_ID}&
redirect_uri={REDIRECT_URI}&
response_type=code&
scope=openid%20profile%20email&
state={RANDOM_STATE}&
code_challenge={CODE_CHALLENGE}&
code_challenge_method=S256
Required Parameters:
client_id: Your registered client IDredirect_uri: Must match registered redirect URI exactlyresponse_type: Alwayscodefor authorization code flowscope: Requested scopes (e.g.,openid profile email)state: Random string for CSRF protection
PKCE Parameters (optional for confidential clients, required for public):
code_challenge: SHA256 hash of code_verifiercode_challenge_method:S256(SHA-256)
Python/FastAPI Example:
import secrets
import hashlib
import base64
from fastapi import FastAPI, Request
from fastapi.responses import RedirectResponse
app = FastAPI()
OAUTH_AUTH_URL = "http://localhost:3001"
CLIENT_ID = "your-client-id"
REDIRECT_URI = "https://api.example.com/auth/callback"
@app.get("/login")
async def login(request: Request):
"""Initiate OAuth login flow"""
# Generate state for CSRF protection
state = secrets.token_urlsafe(32)
request.session['oauth_state'] = state
# Optional: Generate PKCE values for enhanced security
code_verifier = base64.urlsafe_b64encode(
secrets.token_bytes(32)
).decode('utf-8').rstrip('=')
code_challenge = base64.urlsafe_b64encode(
hashlib.sha256(code_verifier.encode()).digest()
).decode('utf-8').rstrip('=')
# Store verifier for later
request.session['code_verifier'] = code_verifier
# Build authorization URL
params = {
"client_id": CLIENT_ID,
"redirect_uri": REDIRECT_URI,
"response_type": "code",
"scope": "openid profile email",
"state": state,
"code_challenge": code_challenge,
"code_challenge_method": "S256"
}
auth_url = f"{OAUTH_AUTH_URL}/api/auth/oauth2/authorize?" + "&".join(
f"{k}={v}" for k, v in params.items()
)
return RedirectResponse(url=auth_url)After user authenticates, they're redirected back with an authorization code:
GET /auth/callback?code={AUTHORIZATION_CODE}&state={STATE}
FastAPI Example:
from fastapi import HTTPException
@app.get("/auth/callback")
async def callback(request: Request, code: str, state: str):
"""Handle OAuth callback"""
# 1. Verify state matches (CSRF protection)
stored_state = request.session.get('oauth_state')
if state != stored_state:
raise HTTPException(status_code=400, detail="Invalid state - CSRF detected")
# 2. Exchange code for tokens
code_verifier = request.session.get('code_verifier')
token_response = await exchange_code_for_tokens(
code=code,
code_verifier=code_verifier # Optional for confidential clients
)
# 3. Store tokens securely
request.session['access_token'] = token_response['access_token']
request.session['refresh_token'] = token_response['refresh_token']
request.session['id_token'] = token_response['id_token']
# 4. Clean up
request.session.pop('oauth_state', None)
request.session.pop('code_verifier', None)
# 5. Redirect to app
return RedirectResponse(url="/dashboard")For Confidential Clients (with client secret):
Use HTTP Basic Authentication with client credentials:
import httpx
import base64
async def exchange_code_for_tokens(
code: str,
code_verifier: str = None
) -> dict:
"""
Exchange authorization code for tokens.
For confidential clients, uses HTTP Basic Auth.
For public clients, includes PKCE code_verifier.
"""
token_url = f"{OAUTH_AUTH_URL}/api/auth/oauth2/token"
# Build request data
data = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": REDIRECT_URI,
"client_id": CLIENT_ID,
}
# Add PKCE verifier if using PKCE
if code_verifier:
data["code_verifier"] = code_verifier
# For confidential clients: HTTP Basic Auth
# Format: Authorization: Basic base64(client_id:client_secret)
auth = None
if CLIENT_SECRET: # Confidential client
auth = (CLIENT_ID, CLIENT_SECRET) # httpx handles Basic Auth encoding
async with httpx.AsyncClient() as client:
response = await client.post(
token_url,
data=data,
auth=auth # HTTP Basic Auth for confidential clients
)
response.raise_for_status()
return response.json()Token Response:
{
"access_token": "eyJhbGciOiJSUzI1NiIs...",
"token_type": "Bearer",
"expires_in": 21600,
"refresh_token": "eyJhbGciOiJSUzI1NiIs...",
"id_token": "eyJhbGciOiJSUzI1NiIs...",
"scope": "openid profile email"
}For backend-to-backend flows where you already have user credentials:
# NOT RECOMMENDED: Only for trusted first-party backends
# Use Authorization Code Flow with user consent instead
async def direct_token_exchange(username: str, password: str):
"""
Direct password grant (if enabled).
WARNING: Only use for highly trusted first-party apps.
"""
token_url = f"{OAUTH_AUTH_URL}/api/auth/oauth2/token"
response = await httpx.AsyncClient().post(
token_url,
data={
"grant_type": "password", # Requires server configuration
"username": username,
"password": password,
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"scope": "openid profile email"
},
auth=(CLIENT_ID, CLIENT_SECRET)
)
return response.json()The auth server signs tokens with RS256 (RSA with SHA-256). Your backend can verify tokens using the public keys from the JWKS endpoint.
Benefits:
- ✅ No shared secrets needed
- ✅ Verify tokens without database calls
- ✅ Standard OIDC compliance
- ✅ Scales horizontally
JWKS Endpoint: /api/auth/jwks
import httpx
import time
from jose import jwt, jwk
from jose.backends import RSAKey
# Configuration
JWKS_URL = f"{OAUTH_AUTH_URL}/api/auth/jwks"
ISSUER = OAUTH_AUTH_URL
# Cache JWKS for performance
_jwks_cache: dict = {}
_jwks_cache_time: float = 0
JWKS_CACHE_TTL = 3600 # 1 hour
async def get_jwks() -> dict:
"""Fetch and cache JWKS from auth server"""
global _jwks_cache, _jwks_cache_time
current_time = time.time()
if _jwks_cache and (current_time - _jwks_cache_time) < JWKS_CACHE_TTL:
return _jwks_cache
async with httpx.AsyncClient() as client:
response = await client.get(JWKS_URL)
response.raise_for_status()
_jwks_cache = response.json()
_jwks_cache_time = current_time
return _jwks_cacheJWKS Response:
{
"keys": [
{
"kty": "RSA",
"kid": "key-id-12345",
"alg": "RS256",
"use": "sig",
"n": "base64url-encoded-modulus",
"e": "AQAB"
}
]
}from jose import jwt, JWTError
from pydantic import BaseModel
from typing import Optional
class TokenPayload(BaseModel):
"""Token claims from the auth server"""
sub: str # User ID
email: Optional[str] = None
name: Optional[str] = None
role: Optional[str] = "user"
tenant_id: Optional[str] = None
organization_ids: Optional[list[str]] = []
org_role: Optional[str] = None
software_background: Optional[str] = None
hardware_tier: Optional[str] = None
exp: int
iat: int
iss: str
aud: Optional[str] = None
async def verify_token(token: str) -> TokenPayload:
"""
Verify JWT token signature and claims.
Steps:
1. Get key ID (kid) from token header
2. Fetch matching public key from JWKS
3. Verify signature
4. Validate claims (exp, iss, aud)
"""
try:
# Get token header to find key ID
header = jwt.get_unverified_header(token)
kid = header.get('kid')
# Get JWKS
jwks = await get_jwks()
# Find matching key
public_key = None
for key in jwks.get('keys', []):
if key.get('kid') == kid:
public_key = key
break
if not public_key:
# Key not found - refresh cache and retry once
_jwks_cache.clear()
jwks = await get_jwks()
for key in jwks.get('keys', []):
if key.get('kid') == kid:
public_key = key
break
if not public_key:
raise JWTError("No matching key found")
# Verify token
payload = jwt.decode(
token,
public_key,
algorithms=['RS256'],
issuer=ISSUER,
options={"verify_aud": False} # Set to True if audience is configured
)
return TokenPayload(**payload)
except JWTError as e:
raise HTTPException(
status_code=401,
detail=f"Token verification failed: {str(e)}"
)from fastapi import Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Security(security)
) -> TokenPayload:
"""
Dependency to get the current authenticated user.
Usage:
@app.get("/api/me")
async def get_me(user: TokenPayload = Depends(get_current_user)):
return {"user_id": user.sub}
"""
token = credentials.credentials
return await verify_token(token)@app.get("/api/me")
async def get_me(user: TokenPayload = Depends(get_current_user)):
"""Get current user info - requires authentication"""
return {
"user_id": user.sub,
"email": user.email,
"name": user.name,
"role": user.role,
"tenant_id": user.tenant_id,
"software_background": user.software_background,
"hardware_tier": user.hardware_tier,
}
@app.get("/api/protected")
async def protected_endpoint(user: TokenPayload = Depends(get_current_user)):
"""Any authenticated user can access"""
return {"message": f"Hello {user.name}"}| Claim | Type | Description |
|---|---|---|
sub |
string | User ID (unique identifier) |
email |
string | User's email address |
name |
string | User's display name |
role |
string | User role: "user" or "admin" |
tenant_id |
string|null | Primary organization ID |
organization_ids |
string[] | All organizations user belongs to |
org_role |
string|null | Role in primary organization |
software_background |
string|null | "beginner", "intermediate", "advanced" |
hardware_tier |
string|null | "tier1", "tier2", "tier3", "tier4" |
email_verified |
boolean | Whether email is verified |
exp |
int | Token expiration timestamp |
iat |
int | Token issued at timestamp |
iss |
string | Token issuer (auth server URL) |
System Roles:
user: Default role for all usersadmin: Administrative access
async def require_role(required_roles: list[str]):
"""Factory for role-based access control"""
async def role_checker(
user: TokenPayload = Depends(get_current_user)
) -> TokenPayload:
if user.role not in required_roles:
raise HTTPException(
status_code=403,
detail=f"Role '{user.role}' not authorized. Required: {required_roles}"
)
return user
return role_checker
# Usage
@app.get("/api/admin/users")
async def list_users(user: TokenPayload = Depends(require_role(["admin"]))):
"""Admin-only endpoint"""
return {"message": "You are an admin", "admin_id": user.sub}Multi-tenant data isolation:
async def require_tenant(
user: TokenPayload = Depends(get_current_user)
) -> TokenPayload:
"""Require user to belong to a tenant/organization"""
if not user.tenant_id:
raise HTTPException(
status_code=403,
detail="User must belong to an organization"
)
return user
# Usage
@app.get("/api/tenant/resources")
async def get_tenant_resources(
user: TokenPayload = Depends(require_tenant),
db: Session = Depends(get_db)
):
"""Get resources scoped to user's tenant"""
resources = db.query(Resource).filter(
Resource.tenant_id == user.tenant_id
).all()
return resources
@app.post("/api/tenant/resources")
async def create_resource(
data: ResourceCreate,
user: TokenPayload = Depends(require_tenant),
db: Session = Depends(get_db)
):
"""Create resource scoped to user's tenant"""
resource = Resource(
**data.dict(),
tenant_id=user.tenant_id, # Automatic tenant isolation
created_by=user.sub
)
db.add(resource)
db.commit()
return resourceasync def require_org_admin(
user: TokenPayload = Depends(get_current_user)
) -> TokenPayload:
"""Require organization admin or owner role"""
if user.org_role not in ["owner", "admin"]:
raise HTTPException(
status_code=403,
detail="Organization admin access required"
)
return user
@app.post("/api/org/settings")
async def update_org_settings(
settings: OrgSettings,
user: TokenPayload = Depends(require_org_admin)
):
"""Only org admins/owners can update settings"""
# Update organization settings...
return {"success": True}async def require_scope(required_scope: str):
"""Decorator to require specific OAuth scope"""
async def scope_checker(
user: TokenPayload = Depends(get_current_user)
) -> TokenPayload:
# Note: Scopes are in the token but not in our TokenPayload model
# You may need to add a 'scope' field to TokenPayload
scopes = getattr(user, 'scope', '').split(' ')
if required_scope not in scopes:
raise HTTPException(
status_code=403,
detail=f"Requires scope: {required_scope}"
)
return user
return scope_checker
@app.get("/users/profile")
async def get_profile(user: TokenPayload = Depends(require_scope("profile"))):
"""Requires 'profile' scope"""
return {"profile": {...}}Control access based on user's hardware capabilities:
TIER_LEVELS = {"tier1": 1, "tier2": 2, "tier3": 3, "tier4": 4}
async def require_hardware_tier(min_tier: str):
"""Require minimum hardware tier"""
async def tier_checker(
user: TokenPayload = Depends(get_current_user)
) -> TokenPayload:
user_tier = TIER_LEVELS.get(user.hardware_tier or "tier1", 1)
required_tier = TIER_LEVELS.get(min_tier, 1)
if user_tier < required_tier:
raise HTTPException(
status_code=403,
detail=f"This content requires {min_tier}. You have {user.hardware_tier or 'tier1'}."
)
return user
return tier_checker
@app.get("/api/content/{content_id}")
async def get_content(
content_id: str,
user: TokenPayload = Depends(require_hardware_tier("tier2"))
):
"""Content requiring at least tier2 (RTX GPU)"""
return {"content": "Advanced GPU-accelerated content"}from functools import wraps
async def require_admin_with_tenant(
user: TokenPayload = Depends(get_current_user)
) -> TokenPayload:
"""Require both admin role AND tenant membership"""
if user.role != "admin":
raise HTTPException(status_code=403, detail="Admin role required")
if not user.tenant_id:
raise HTTPException(status_code=403, detail="Organization membership required")
return user
@app.get("/api/admin/tenant/analytics")
async def get_tenant_analytics(
user: TokenPayload = Depends(require_admin_with_tenant)
):
"""Admin users viewing their organization's analytics"""
return {"analytics": "data for tenant", "tenant_id": user.tenant_id}For headless/backend-to-backend authentication (machine-to-machine), use the Client Credentials grant type.
Status: ✅ Fully Supported - Available for all confidential clients registered via /api/admin/clients/register
Client Credentials flow is for services authenticating themselves (not on behalf of a user):
┌─────────────────┐ ┌──────────────┐
│ Your Backend │ │ Auth Server │
│ (FastAPI) │ │ (Better Auth)│
└─────────────────┘ └──────────────┘
│ │
│ 1. POST /oauth2/token │
│ grant_type=client_credentials │
│ + HTTP Basic Auth │
│────────────────────────────────────▶│
│ │
│ 2. Verify client │
│ credentials │
│ │
│ 3. Return access_token │
│◀────────────────────────────────────│
│ │
│ 4. Use token to call APIs │
│ │
import httpx
import os
from typing import Dict, Optional
# Configuration
OAUTH_AUTH_URL = os.getenv("OAUTH_AUTH_URL", "http://localhost:3001")
CLIENT_ID = os.getenv("OAUTH_CLIENT_ID")
CLIENT_SECRET = os.getenv("OAUTH_CLIENT_SECRET")
async def get_service_token(scope: str = "api:read api:write") -> Dict[str, str]:
"""
Machine-to-machine authentication using Client Credentials Grant.
No user context - service acts on its own behalf.
Args:
scope: Space-separated list of scopes to request
Returns:
Dictionary with access_token, token_type, expires_in
"""
token_url = f"{OAUTH_AUTH_URL}/api/auth/oauth2/token"
async with httpx.AsyncClient() as client:
response = await client.post(
token_url,
data={
"grant_type": "client_credentials",
"scope": scope
},
auth=(CLIENT_ID, CLIENT_SECRET) # HTTP Basic Auth
)
response.raise_for_status()
return response.json()
# Usage example
async def background_job():
"""Example: Background job that needs API access"""
# Get service token
token_data = await get_service_token(scope="api:read api:write")
access_token = token_data["access_token"]
# Use token to call APIs
async with httpx.AsyncClient() as client:
response = await client.get(
"https://api.example.com/protected",
headers={"Authorization": f"Bearer {access_token}"}
)
return response.json()import axios from 'axios';
interface TokenResponse {
access_token: string;
token_type: string;
expires_in: number;
scope: string;
}
async function getServiceToken(scope: string = 'api:read api:write'): Promise<TokenResponse> {
const tokenUrl = `${process.env.OAUTH_AUTH_URL}/api/auth/oauth2/token`;
const response = await axios.post(
tokenUrl,
new URLSearchParams({
grant_type: 'client_credentials',
scope: scope
}),
{
auth: {
username: process.env.OAUTH_CLIENT_ID!,
password: process.env.OAUTH_CLIENT_SECRET!
},
headers: {
'Content-Type': 'application/x-www-form-urlencoded'
}
}
);
return response.data;
}
// Usage
async function backgroundJob() {
const { access_token } = await getServiceToken();
const apiResponse = await axios.get('https://api.example.com/protected', {
headers: { Authorization: `Bearer ${access_token}` }
});
return apiResponse.data;
}# Request service token
curl -X POST http://localhost:3001/api/auth/oauth2/token \
-u "your-client-id:your-client-secret" \
-d "grant_type=client_credentials" \
-d "scope=api:read api:write"
# Response
{
"access_token": "eyJhbGc...",
"token_type": "Bearer",
"expires_in": 21600,
"scope": "api:read api:write"
}
# Use the token
curl https://api.example.com/protected \
-H "Authorization: Bearer eyJhbGc..."Client credentials tokens should be cached and reused until expiration:
from datetime import datetime, timedelta
from typing import Optional
class ServiceTokenManager:
"""Cache and manage service tokens"""
def __init__(self):
self._token: Optional[str] = None
self._expires_at: Optional[datetime] = None
async def get_token(self, scope: str = "api:read api:write") -> str:
"""Get cached token or fetch new one if expired"""
# Return cached token if still valid
if self._token and self._expires_at:
if datetime.now() < self._expires_at - timedelta(minutes=5):
return self._token
# Fetch new token
token_data = await get_service_token(scope)
self._token = token_data["access_token"]
self._expires_at = datetime.now() + timedelta(seconds=token_data["expires_in"])
return self._token
# Global instance
token_manager = ServiceTokenManager()
# Usage
async def call_api():
token = await token_manager.get_token()
# Use token...Client credentials tokens are standard JWTs - verify them the same way as authorization code tokens (see Section 3):
from fastapi import Depends
@app.get("/api/service-endpoint")
async def service_endpoint(
user: TokenPayload = Depends(get_current_user)
):
"""
Endpoint accepting both user tokens and service tokens.
Service tokens will have:
- sub: client_id (not user ID)
- scope: client-defined scopes
- No user-specific claims (name, email, etc.)
"""
if user.sub.startswith("client-"): # Check if service token
return {"message": "Service-to-service call", "client": user.sub}
else:
return {"message": "User call", "user_id": user.sub}✅ Perfect for:
- Background jobs accessing APIs
- Service-to-service communication
- Cron jobs that don't act on behalf of a user
- Internal microservices
- Data sync processes
❌ Not suitable for:
- User-facing applications (use Authorization Code Flow)
- Actions that require user context/permissions
- Consent screens or user profile access
For simpler machine-to-machine authentication without OAuth complexity, use API Keys. API Keys are ideal for services that need a static credential without token refresh flows.
Status: ✅ Fully Supported - Create keys via admin dashboard or API
┌─────────────────┐ ┌──────────────┐
│ External │ │ Auth Server │
│ Service │ │ (SSO) │
│ (FastAPI/Node) │ │ │
└─────────────────┘ └──────────────┘
│ │
│ 1. POST /api/api-key/verify │
│ body: {"key": "pana_xxx..."} │
│────────────────────────────────────▶│
│ │
│ 2. Hash key, │
│ lookup in DB │
│ │
│ 3. Return validation result │
│ {valid: true, key: {...}} │
│◀────────────────────────────────────│
│ │
API keys are created by admins via the dashboard or API:
Dashboard: /admin/service-keys - Create, revoke, delete keys
API (requires admin session):
curl -X POST http://localhost:3001/api/auth/api-key/create \
-H "Content-Type: application/json" \
-H "Cookie: robolearn_session=<admin-session>" \
-d '{
"name": "ai-native-service",
"expiresIn": 2592000
}'
# Response (SAVE THIS KEY - cannot be retrieved later!)
{
"key": "pana_lniZlImAbPDnAWURoySDCKNVDfiPuRXxkNNYLCBpXTKHitxlUXqcKtWuaFMWIbqI",
"id": "6bo876z0qVVKew66bzXdguuRqsii4YwQ",
"name": "ai-native-service",
"expiresAt": "2026-01-03T15:33:38.440Z"
}Endpoint: POST /api/api-key/verify
This endpoint is public (no authentication required) - designed for external services to validate incoming API keys.
import httpx
from fastapi import HTTPException, Security
from fastapi.security import APIKeyHeader
# Configuration
SSO_URL = "http://localhost:3001" # Your SSO server URL
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
async def verify_api_key(api_key: str) -> dict:
"""
Verify API key against the SSO server.
Returns:
dict with key info if valid, raises HTTPException if invalid
"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{SSO_URL}/api/api-key/verify",
json={"key": api_key}
)
data = response.json()
if not data.get("valid"):
error = data.get("error", {})
raise HTTPException(
status_code=401,
detail=f"Invalid API key: {error.get('message', 'Unknown error')}"
)
return data["key"]
async def get_api_key(api_key: str = Security(api_key_header)) -> dict:
"""FastAPI dependency for API key authentication"""
if not api_key:
raise HTTPException(status_code=401, detail="API key required")
return await verify_api_key(api_key)
# Usage in routes
@app.get("/api/protected")
async def protected_endpoint(key_info: dict = Security(get_api_key)):
"""Protected by API key"""
return {
"message": "Authenticated via API key",
"key_name": key_info.get("name"),
"user_id": key_info.get("userId")
}import axios from 'axios';
const SSO_URL = process.env.SSO_URL || 'http://localhost:3001';
interface ApiKeyInfo {
id: string;
name: string;
userId: string;
enabled: boolean;
expiresAt: string | null;
metadata: Record<string, unknown> | null;
}
interface VerifyResponse {
valid: boolean;
key?: ApiKeyInfo;
error?: { code: string; message: string };
}
async function verifyApiKey(apiKey: string): Promise<ApiKeyInfo> {
const response = await axios.post<VerifyResponse>(
`${SSO_URL}/api/api-key/verify`,
{ key: apiKey }
);
if (!response.data.valid) {
throw new Error(response.data.error?.message || 'Invalid API key');
}
return response.data.key!;
}
// Express middleware
export async function apiKeyAuth(req, res, next) {
const apiKey = req.headers['x-api-key'];
if (!apiKey) {
return res.status(401).json({ error: 'API key required' });
}
try {
req.keyInfo = await verifyApiKey(apiKey);
next();
} catch (error) {
return res.status(401).json({ error: error.message });
}
}
// Usage
app.get('/api/protected', apiKeyAuth, (req, res) => {
res.json({
message: 'Authenticated via API key',
keyName: req.keyInfo.name,
userId: req.keyInfo.userId
});
});# Valid key
curl -X POST http://localhost:3001/api/api-key/verify \
-H "Content-Type: application/json" \
-d '{"key": "pana_lniZlImAbPDnAWURoySDCKNVDfiPuRXxkNNYLCBpXTKHitxlUXqcKtWuaFMWIbqI"}'
# Response (valid)
{
"valid": true,
"key": {
"id": "6bo876z0qVVKew66bzXdguuRqsii4YwQ",
"name": "ai-native",
"userId": "75U31gmiBIut08ODLsGAO837rmM0vIz0",
"enabled": true,
"expiresAt": "2026-01-03T15:33:38.440Z",
"metadata": null
}
}
# Invalid key
curl -X POST http://localhost:3001/api/api-key/verify \
-H "Content-Type: application/json" \
-d '{"key": "pana_invalid_key"}'
# Response (invalid)
{
"valid": false,
"error": {
"message": "Invalid API key.",
"code": "KEY_NOT_FOUND"
}
}| Code | HTTP Status | Description |
|---|---|---|
MISSING_KEY |
400 | No key provided in request body |
KEY_NOT_FOUND |
401 | API key doesn't exist or is invalid |
EXPIRED_API_KEY |
401 | API key has expired |
DISABLED_API_KEY |
401 | API key has been revoked/disabled |
INTERNAL_ERROR |
500 | Server error during verification |
| Feature | API Keys | Client Credentials |
|---|---|---|
| Complexity | Simple (single key) | Complex (OAuth flow) |
| Token Refresh | Not needed (static key) | Required (access tokens expire) |
| Revocation | Instant (disable key) | Until token expires |
| Rotation | Create new key, update client | Rotate client secret |
| Scopes | Via metadata | Standard OAuth scopes |
| User Context | Linked to key owner | Client-level only |
✅ Perfect for API Keys:
- External services calling your APIs (FastAPI, NestJS, MCP servers)
- GitHub Actions and CI/CD pipelines
- Partner integrations with simple auth needs
- Webhook receivers that need to validate authenticity
- Services where you want to track usage per key
❌ Use Client Credentials instead for:
- Internal microservices with fine-grained scopes
- Services that need token-based caching strategies
- OAuth ecosystem integration
DO:
- Store access tokens in memory only (short-lived)
- Use httpOnly cookies for refresh tokens
- Clear tokens on logout
DON'T:
- Store tokens in localStorage (XSS vulnerable)
- Log tokens in application logs
- Send tokens in URL parameters
# Recommended cache TTL
JWKS_CACHE_TTL = 3600 # 1 hour in production
# Refresh on kid mismatch
if kid not in cached_keys:
# Force refresh
_jwks_cache.clear()
jwks = await get_jwks()# Enforce HTTPS in production
import os
if os.getenv("ENV") == "production":
if not OAUTH_AUTH_URL.startswith("https://"):
raise ValueError("AUTH_URL must use HTTPS in production")Add rate limiting to prevent token verification abuse:
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@app.get("/api/protected")
@limiter.limit("100/minute")
async def protected_endpoint(
request: Request,
user: TokenPayload = Depends(get_current_user)
):
return {"data": "protected"}Implement token refresh before expiration:
async def refresh_access_token(refresh_token: str) -> dict:
"""Refresh access token using refresh token"""
token_url = f"{OAUTH_AUTH_URL}/api/auth/oauth2/token"
response = await httpx.AsyncClient().post(
token_url,
data={
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": CLIENT_ID,
},
auth=(CLIENT_ID, CLIENT_SECRET)
)
return response.json()from fastapi import Request
from fastapi.responses import JSONResponse
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
"""Standardized error responses"""
if exc.status_code == 401:
return JSONResponse(
status_code=401,
content={
"error": "unauthorized",
"message": "Authentication required",
"detail": str(exc.detail)
}
)
if exc.status_code == 403:
return JSONResponse(
status_code=403,
content={
"error": "forbidden",
"message": "Insufficient permissions",
"detail": str(exc.detail)
}
)
return JSONResponse(
status_code=exc.status_code,
content={"error": "error", "detail": str(exc.detail)}
)import logging
logger = logging.getLogger(__name__)
async def get_current_user_with_logging(
credentials: HTTPAuthorizationCredentials = Security(security)
) -> TokenPayload:
"""Enhanced version with audit logging"""
try:
user = await verify_token(credentials.credentials)
logger.info(f"User authenticated: {user.sub} ({user.email})")
return user
except HTTPException as e:
logger.warning(f"Authentication failed: {e.detail}")
raise# config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# OAuth Configuration
oauth_client_id: str
oauth_client_secret: str
oauth_auth_url: str
oauth_redirect_uri: str
oauth_issuer: str
# JWKS Configuration
jwks_cache_ttl: int = 3600
# Security
enforce_https: bool = True
class Config:
env_file = ".env"
settings = Settings()@app.get("/health")
async def health_check():
"""Health check endpoint"""
try:
# Verify JWKS is accessible
jwks = await get_jwks()
jwks_healthy = bool(jwks.get("keys"))
except:
jwks_healthy = False
return {
"status": "healthy" if jwks_healthy else "degraded",
"checks": {
"jwks": "ok" if jwks_healthy else "error"
}
}- FastAPI Integration Guide - Detailed FastAPI examples
- JWT & JWKS Documentation - Token verification details
- RBAC & Scopes - Authorization patterns
- PKCE Flow - Public client authentication
- Multi-Tenancy - Tenant isolation patterns
- Troubleshooting - Common issues and solutions
| Endpoint | Method | Purpose |
|---|---|---|
/api/admin/clients/register |
POST | Register confidential client (admin auth) |
/api/auth/oauth2/authorize |
GET | Start OAuth flow (browser redirect) |
/api/auth/oauth2/token |
POST | Exchange code for tokens |
/api/auth/oauth2/userinfo |
GET | Get user info (requires access token) |
/api/auth/jwks |
GET | Public keys for token verification |
/.well-known/openid-configuration |
GET | OIDC discovery endpoint |
/api/api-key/verify |
POST | Verify API key (public, for M2M services) |
/api/auth/api-key/create |
POST | Create API key (requires admin session) |
/api/auth/api-key/list |
GET | List user's API keys (requires session) |
/api/auth/api-key/delete |
POST | Delete API key (requires session) |
/api/auth/api-key/update |
POST | Update/revoke API key (requires session) |
# Required
OAUTH_CLIENT_ID=your-client-id
OAUTH_CLIENT_SECRET=your-client-secret
OAUTH_AUTH_URL=https://auth.yourdomain.com
OAUTH_REDIRECT_URI=https://api.yourdomain.com/auth/callback
# Optional
JWKS_CACHE_TTL=3600| Token Type | Lifetime | Renewable |
|---|---|---|
| Authorization Code | 10 minutes | No (one-time use) |
| Access Token | 6 hours | Via refresh token |
| Refresh Token | 7 days | No (request new access token) |
| ID Token | 6 hours | N/A (informational only) |
For issues or questions:
- Check Troubleshooting Guide
- Review Better Auth Documentation
- File an issue on GitHub