Skip to content

Commit 6074dd8

Browse files
committed
Add missing csrf.py module required by router imports
The opi.utils.csrf module is imported by router.py and router_self_service.py but was never committed, causing pyright reportMissingImports errors in CI.
1 parent fdf6f66 commit 6074dd8

1 file changed

Lines changed: 168 additions & 0 deletions

File tree

  • operations-manager/python/opi/utils
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
"""
2+
CSRF (Cross-Site Request Forgery) protection utilities.
3+
4+
This module provides CSRF token generation and validation to protect against
5+
cross-site request forgery attacks. It works alongside Origin/Referer header
6+
validation to provide defense in depth.
7+
8+
Usage:
9+
# In route handler that renders forms:
10+
csrf_token = ensure_csrf_token(request)
11+
return templates.TemplateResponse("form.html", {"csrf_token": csrf_token})
12+
13+
# In route handler that processes form submission:
14+
validate_csrf_token(request) # Raises HTTPException if invalid
15+
"""
16+
17+
import logging
18+
import secrets
19+
from typing import Any
20+
21+
from fastapi import HTTPException, Request
22+
23+
logger = logging.getLogger(__name__)
24+
25+
# CSRF token configuration
26+
CSRF_TOKEN_LENGTH = 32 # 256 bits of entropy
27+
CSRF_TOKEN_SESSION_KEY = "_csrf_token"
28+
CSRF_TOKEN_HEADER_NAME = "X-CSRF-Token"
29+
CSRF_TOKEN_FORM_FIELD = "csrf_token"
30+
31+
32+
def generate_csrf_token() -> str:
33+
"""Generate a cryptographically secure CSRF token.
34+
35+
Returns:
36+
A hex-encoded random token with CSRF_TOKEN_LENGTH bytes of entropy.
37+
"""
38+
return secrets.token_hex(CSRF_TOKEN_LENGTH)
39+
40+
41+
def get_csrf_token_from_session(request: Request) -> str | None:
42+
"""Get the CSRF token from the session if it exists.
43+
44+
Args:
45+
request: The FastAPI/Starlette request object
46+
47+
Returns:
48+
The CSRF token string or None if not set
49+
"""
50+
return request.session.get(CSRF_TOKEN_SESSION_KEY)
51+
52+
53+
def ensure_csrf_token(request: Request) -> str:
54+
"""Ensure a CSRF token exists in the session, creating one if needed.
55+
56+
This should be called when rendering pages that contain forms.
57+
58+
Args:
59+
request: The FastAPI/Starlette request object
60+
61+
Returns:
62+
The CSRF token string (existing or newly generated)
63+
"""
64+
token = get_csrf_token_from_session(request)
65+
if not token:
66+
token = generate_csrf_token()
67+
request.session[CSRF_TOKEN_SESSION_KEY] = token
68+
logger.debug("Generated new CSRF token for session")
69+
return token
70+
71+
72+
def get_csrf_token_from_request(request: Request, form_data: dict[str, Any] | None = None) -> str | None:
73+
"""Extract the CSRF token from the request (header or form data).
74+
75+
Checks in order:
76+
1. X-CSRF-Token header (for AJAX requests)
77+
2. csrf_token form field (for regular form submissions)
78+
79+
Args:
80+
request: The FastAPI/Starlette request object
81+
form_data: Optional pre-parsed form data dictionary
82+
83+
Returns:
84+
The CSRF token string or None if not found
85+
"""
86+
# Check header first (for AJAX requests)
87+
header_token = request.headers.get(CSRF_TOKEN_HEADER_NAME)
88+
if header_token:
89+
return header_token.strip()
90+
91+
# Check form data if provided
92+
if form_data and CSRF_TOKEN_FORM_FIELD in form_data:
93+
form_token = form_data.get(CSRF_TOKEN_FORM_FIELD)
94+
if form_token:
95+
return str(form_token).strip()
96+
97+
return None
98+
99+
100+
def validate_csrf_token(
101+
request: Request,
102+
form_data: dict[str, Any] | None = None,
103+
raise_on_missing: bool = True,
104+
) -> bool:
105+
"""Validate the CSRF token from the request against the session token.
106+
107+
Args:
108+
request: The FastAPI/Starlette request object
109+
form_data: Optional pre-parsed form data dictionary
110+
raise_on_missing: If True, raise HTTPException when token is missing
111+
112+
Returns:
113+
True if token is valid
114+
115+
Raises:
116+
HTTPException: If token is missing (when raise_on_missing=True) or invalid
117+
"""
118+
session_token = get_csrf_token_from_session(request)
119+
request_token = get_csrf_token_from_request(request, form_data)
120+
121+
# Check if session has a token
122+
if not session_token:
123+
logger.warning("CSRF validation failed: No token in session")
124+
if raise_on_missing:
125+
raise HTTPException(
126+
status_code=403,
127+
detail="CSRF validation failed: session expired or invalid"
128+
)
129+
return False
130+
131+
# Check if request includes a token
132+
if not request_token:
133+
logger.warning("CSRF validation failed: No token in request")
134+
if raise_on_missing:
135+
raise HTTPException(
136+
status_code=403,
137+
detail="CSRF validation failed: missing token"
138+
)
139+
return False
140+
141+
# Constant-time comparison to prevent timing attacks
142+
if not secrets.compare_digest(session_token, request_token):
143+
logger.warning("CSRF validation failed: Token mismatch")
144+
raise HTTPException(
145+
status_code=403,
146+
detail="CSRF validation failed: invalid token"
147+
)
148+
149+
logger.debug("CSRF token validated successfully")
150+
return True
151+
152+
153+
def rotate_csrf_token(request: Request) -> str:
154+
"""Rotate the CSRF token after a successful state-changing operation.
155+
156+
This can be called after sensitive operations to prevent token reuse.
157+
It's optional but recommended for high-security operations.
158+
159+
Args:
160+
request: The FastAPI/Starlette request object
161+
162+
Returns:
163+
The new CSRF token
164+
"""
165+
new_token = generate_csrf_token()
166+
request.session[CSRF_TOKEN_SESSION_KEY] = new_token
167+
logger.debug("Rotated CSRF token")
168+
return new_token

0 commit comments

Comments
 (0)