Skip to content

Commit 596bbfc

Browse files
committed
middleware + updated dependencies
1 parent f44ee71 commit 596bbfc

File tree

3 files changed

+237
-8
lines changed

3 files changed

+237
-8
lines changed

backend/app/middleware/auth.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import logging
2+
from typing import List, Dict, Any
3+
4+
from fastapi import Depends, HTTPException, Request, status
5+
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
6+
7+
from ..utilities.constants import LOGGER_NAME
8+
from ..utilities.service_utils import get_auth_service
9+
10+
security = HTTPBearer()
11+
logger = logging.getLogger(LOGGER_NAME("auth"))
12+
13+
14+
def has_roles(required_roles: List[str]):
15+
"""
16+
FastAPI dependency that checks if the authenticated user has one of the required roles.
17+
18+
Args:
19+
required_roles: List of roles that can access the endpoint
20+
21+
Returns:
22+
A dependency that validates the user has one of the specified roles
23+
24+
Example:
25+
@app.get("/admin-only")
26+
async def admin_endpoint(authorized: bool = has_roles(["admin"])):
27+
return {"message": "You have admin access"}
28+
"""
29+
async def role_validator(
30+
request: Request,
31+
credentials: HTTPAuthorizationCredentials = Depends(security),
32+
auth_service = Depends(get_auth_service)
33+
) -> bool:
34+
# Get the token from authorization header
35+
token = credentials.credentials
36+
37+
# Use the auth service to check if the user has the required role
38+
is_authorized = auth_service.is_authorized_by_role(token, set(required_roles))
39+
40+
if not is_authorized:
41+
logger.warning(f"Access denied: user doesn't have required roles: {required_roles}")
42+
raise HTTPException(
43+
status_code=status.HTTP_403_FORBIDDEN,
44+
detail=f"Access denied: requires one of these roles: {required_roles}"
45+
)
46+
47+
return True
48+
49+
return Depends(role_validator)
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import logging
2+
from typing import List
3+
import firebase_admin.auth
4+
from starlette.middleware.base import BaseHTTPMiddleware
5+
from starlette.requests import Request
6+
from starlette.responses import JSONResponse, Response
7+
from starlette.types import ASGIApp
8+
from app.utilities.constants import LOGGER_NAME
9+
10+
11+
class AuthMiddleware(BaseHTTPMiddleware):
12+
def __init__(self, app: ASGIApp, public_paths: List[str] = None):
13+
super().__init__(app)
14+
self.public_paths = public_paths or []
15+
self.logger = logging.getLogger(LOGGER_NAME("auth_middleware"))
16+
17+
def is_public_path(self, path: str) -> bool:
18+
return path in self.public_paths
19+
20+
async def dispatch(self, request: Request, call_next):
21+
22+
if self.is_public_path(request.url.path):
23+
self.logger.info(f"Skipping auth for public path: {request.url.path}")
24+
return await call_next(request)
25+
26+
# Get authentication token from header
27+
auth_header = request.headers.get("Authorization")
28+
if not auth_header or not auth_header.startswith("Bearer "):
29+
self.logger.warning(f"Missing or invalid auth header for {request.url.path}")
30+
return JSONResponse(
31+
status_code=401,
32+
content={"detail": "Authentication required"}
33+
)
34+
35+
token = auth_header.split(" ")[1]
36+
try:
37+
# Verify the token with Firebase
38+
self.logger.info(f"Verifying token for request to {request.url.path}")
39+
decoded_token = firebase_admin.auth.verify_id_token(token, check_revoked=True)
40+
41+
# Get Firebase user information
42+
firebase_user = firebase_admin.auth.get_user(decoded_token["uid"])
43+
44+
request.state.user_id = decoded_token["uid"]
45+
request.state.user_email = decoded_token.get("email")
46+
request.state.email_verified = firebase_user.email_verified
47+
request.state.user_claims = decoded_token.get("claims", {})
48+
49+
# Add complete user info for convenience
50+
request.state.user_info = {
51+
"uid": decoded_token["uid"],
52+
"email": decoded_token.get("email"),
53+
"name": decoded_token.get("name", ""),
54+
"picture": decoded_token.get("picture", ""),
55+
"email_verified": firebase_user.email_verified
56+
}
57+
58+
response = await call_next(request)
59+
60+
if isinstance(response, Response):
61+
response.headers["X-Auth-User-ID"] = decoded_token["uid"]
62+
63+
return response
64+
65+
except firebase_admin.auth.RevokedIdTokenError:
66+
self.logger.warning(f"Token has been revoked: {request.url.path}")
67+
return JSONResponse(
68+
status_code=401,
69+
content={"detail": "Token has been revoked. Please reauthenticate."}
70+
)
71+
except firebase_admin.auth.ExpiredIdTokenError:
72+
self.logger.warning(f"Token has expired: {request.url.path}")
73+
return JSONResponse(
74+
status_code=401,
75+
content={"detail": "Token has expired. Please reauthenticate."}
76+
)
77+
except firebase_admin.auth.InvalidIdTokenError as e:
78+
self.logger.warning(f"Invalid token: {request.url.path}, error: {str(e)}")
79+
return JSONResponse(
80+
status_code=401,
81+
content={"detail": "Invalid authentication token"}
82+
)
83+
except Exception as e:
84+
self.logger.error(f"Authentication error for {request.url.path}: {str(e)}")
85+
return JSONResponse(
86+
status_code=401,
87+
content={"detail": f"Authentication failed: {str(e)}"}
88+
)

backend/app/server.py

Lines changed: 100 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from fastapi.middleware.cors import CORSMiddleware
88

99
from . import models
10-
from .middleware import AuthMiddleware
10+
from .middleware.auth_middleware import AuthMiddleware
1111
from .routes import auth, send_email, user
1212
from .utilities.constants import LOGGER_NAME
1313
from .utilities.firebase_init import initialize_firebase
@@ -73,7 +73,11 @@ def read_item(item_id: int, q: Union[str, None] = None):
7373
async def test_middleware(request: Request) -> Dict[str, Any]:
7474
"""
7575
Test endpoint that requires authentication and shows middleware-added state.
76-
This will only work if you provide a valid Firebase token.
76+
This will only work if you provide a valid Firebase token in the Authorization header.
77+
78+
Example: Authorization: Bearer your-firebase-token
79+
80+
The response will show all user information added by the Firebase auth middleware.
7781
"""
7882
# Get all the attributes from request.state
7983
state_dict = {}
@@ -83,20 +87,24 @@ async def test_middleware(request: Request) -> Dict[str, Any]:
8387
state_dict[key] = getattr(request.state, key)
8488

8589
return {
86-
"message": "Middleware test - Auth required",
90+
"message": "Authentication successful! User info from Firebase token:",
8791
"middleware_state": state_dict,
8892
"user_id": getattr(request.state, "user_id", None),
89-
"request_id": getattr(request.state, "request_id", None),
93+
"user_email": getattr(request.state, "user_email", None),
94+
"email_verified": getattr(request.state, "email_verified", None),
9095
"user_claims": getattr(request.state, "user_claims", None),
91-
"headers": dict(request.headers)
96+
"user_info": getattr(request.state, "user_info", None),
97+
"request_id": getattr(request.state, "request_id", None),
98+
"authorization_header": request.headers.get("Authorization", "Not provided")
9299
}
93100

94101

95102
@app.get("/test-middleware-public")
96103
async def test_middleware_public(request: Request) -> Dict[str, Any]:
97104
"""
98105
Public test endpoint that shows middleware-added state.
99-
This should work without authentication as it's in PUBLIC_PATHS.
106+
This endpoint is in PUBLIC_PATHS, so it works without authentication.
107+
No Firebase token is required to access this endpoint.
100108
"""
101109
# Get all the attributes from request.state
102110
state_dict = {}
@@ -105,9 +113,93 @@ async def test_middleware_public(request: Request) -> Dict[str, Any]:
105113
if not key.startswith("_") and not callable(getattr(request.state, key)):
106114
state_dict[key] = getattr(request.state, key)
107115

116+
# Check if any auth header was provided (optional for this endpoint)
117+
auth_header = request.headers.get("Authorization")
118+
auth_message = "No authentication required for this endpoint"
119+
if auth_header:
120+
auth_message += " (but you provided an auth header anyway)"
121+
108122
return {
109-
"message": "Middleware test - Public",
123+
"message": "Public endpoint - No authentication required",
124+
"auth_status": auth_message,
110125
"middleware_state": state_dict,
111126
"request_id": getattr(request.state, "request_id", None),
112-
"headers": dict(request.headers)
127+
"authorization_header": request.headers.get("Authorization", "Not provided"),
128+
}
129+
130+
131+
# Role-based access test endpoints
132+
from .middleware.auth import has_roles
133+
from .schemas.user import UserRole
134+
135+
@app.get("/test-role-admin")
136+
async def test_role_admin(
137+
request: Request,
138+
authorized: bool = has_roles([UserRole.ADMIN])
139+
) -> Dict[str, Any]:
140+
"""
141+
Test endpoint that requires the Admin role.
142+
143+
This demonstrates role-based access control using the has_roles dependency.
144+
Only users with the Admin role can access this endpoint.
145+
"""
146+
return {
147+
"message": "You have successfully accessed an admin-only endpoint",
148+
"user_id": request.state.user_id,
149+
"user_email": request.state.user_email,
150+
"role": "admin"
151+
}
152+
153+
@app.get("/test-role-volunteer")
154+
async def test_role_volunteer(
155+
request: Request,
156+
authorized: bool = has_roles([UserRole.VOLUNTEER])
157+
) -> Dict[str, Any]:
158+
"""
159+
Test endpoint that requires the Volunteer role.
160+
161+
This demonstrates role-based access control using the has_roles dependency.
162+
Only users with the Volunteer role can access this endpoint.
163+
"""
164+
return {
165+
"message": "You have successfully accessed a volunteer-only endpoint",
166+
"user_id": request.state.user_id,
167+
"user_email": request.state.user_email,
168+
"role": "volunteer"
169+
}
170+
171+
@app.get("/test-role-participant")
172+
async def test_role_participant(
173+
request: Request,
174+
authorized: bool = has_roles([UserRole.PARTICIPANT])
175+
) -> Dict[str, Any]:
176+
"""
177+
Test endpoint that requires the Participant role.
178+
179+
This demonstrates role-based access control using the has_roles dependency.
180+
Only users with the Participant role can access this endpoint.
181+
"""
182+
return {
183+
"message": "You have successfully accessed a participant-only endpoint",
184+
"user_id": request.state.user_id,
185+
"user_email": request.state.user_email,
186+
"role": "participant"
187+
}
188+
189+
@app.get("/test-role-multiple")
190+
async def test_role_multiple(
191+
request: Request,
192+
authorized: bool = has_roles([UserRole.ADMIN, UserRole.VOLUNTEER])
193+
) -> Dict[str, Any]:
194+
"""
195+
Test endpoint that requires either Admin OR Volunteer role.
196+
197+
This demonstrates role-based access control with multiple allowed roles.
198+
Users with either Admin or Volunteer roles can access this endpoint.
199+
"""
200+
return {
201+
"message": "You have successfully accessed an endpoint requiring admin OR volunteer role",
202+
"user_id": request.state.user_id,
203+
"user_email": request.state.user_email,
204+
"roles_allowed": ["admin", "volunteer"]
113205
}

0 commit comments

Comments
 (0)