Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
**/.DS_Store
**/*.cache
**/*.egg-info
**/test.db
3 changes: 3 additions & 0 deletions backend/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,6 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

# Firebase
serviceAccountKey.json
15 changes: 15 additions & 0 deletions backend/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,21 @@ pdm run ruff format .
## Environment Variables
Environment variables are currently stored in an .env file within the base repository (not the backend folder). You will need to copy the local environment variables stored in the following notion [page](https://www.notion.so/uwblueprintexecs/Environment-Variables-11910f3fb1dc80e4bc67d35c3d65d073?pvs=4) to get the database working.

### Firebase Configuration
To set up Firebase authentication:

1. Place your `serviceAccountKey.json` file in the `backend/` directory
- This file should be obtained from your Firebase Console
- Go to Project Settings > Service Accounts > Generate New Private Key
- The file contains sensitive credentials and is automatically gitignored

2. Ensure your `.env` file includes the following Firebase-related variables:
```
FIREBASE_WEB_API_KEY=your_web_api_key
```
You can find these values in your Firebase Console under Project Settings.

Note: Never commit `serviceAccountKey.json` to version control. It's already added to `.gitignore` for security.

## Adding a new model
When adding a new model, make sure to add it to `app/models/__init__.py` so that the migration script can pick it up when autogenerating the new migration.
Expand Down
53 changes: 53 additions & 0 deletions backend/app/middleware/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import logging
from typing import List

from fastapi import Depends, HTTPException, Request, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer

from ..utilities.constants import LOGGER_NAME
from ..utilities.service_utils import get_auth_service

security = HTTPBearer()
logger = logging.getLogger(LOGGER_NAME("auth"))


def has_roles(required_roles: List[str]):
"""
FastAPI dependency that checks if the authenticated user has
one of the required roles.

Args:
required_roles: List of roles that can access the endpoint

Returns:
A dependency that validates the user has one of the specified roles

Example:
@app.get("/admin-only")
async def admin_endpoint(authorized: bool = has_roles(["admin"])):
return {"message": "You have admin access"}
"""

async def role_validator(
request: Request,
credentials: HTTPAuthorizationCredentials = Depends(security),
auth_service=Depends(get_auth_service),
) -> bool:
# Get the token from authorization header
token = credentials.credentials

# Use the auth service to check if the user has the required role
is_authorized = auth_service.is_authorized_by_role(token, set(required_roles))

if not is_authorized:
logger.warning(
f"Access denied: user doesn't have required roles: {required_roles}"
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Access denied: requires one of these roles: {required_roles}",
)

return True

return Depends(role_validator)
90 changes: 90 additions & 0 deletions backend/app/middleware/auth_middleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import logging
from typing import List

import firebase_admin.auth
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import JSONResponse, Response
from starlette.types import ASGIApp

from app.utilities.constants import LOGGER_NAME


class AuthMiddleware(BaseHTTPMiddleware):
def __init__(self, app: ASGIApp, public_paths: List[str] = None):
super().__init__(app)
self.public_paths = public_paths or []
self.logger = logging.getLogger(LOGGER_NAME("auth_middleware"))

def is_public_path(self, path: str) -> bool:
return path in self.public_paths

async def dispatch(self, request: Request, call_next):
if self.is_public_path(request.url.path):
self.logger.info(f"Skipping auth for public path: {request.url.path}")
return await call_next(request)

# Get authentication token from header
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
self.logger.warning(
f"Missing or invalid auth header for {request.url.path}"
)
return JSONResponse(
status_code=401, content={"detail": "Authentication required"}
)

token = auth_header.split(" ")[1]
try:
# Verify the token with Firebase
self.logger.info(f"Verifying token for request to {request.url.path}")
decoded_token = firebase_admin.auth.verify_id_token(
token, check_revoked=True
)

# Get Firebase user information
firebase_user = firebase_admin.auth.get_user(decoded_token["uid"])

request.state.user_id = decoded_token["uid"]
request.state.user_email = decoded_token.get("email")
request.state.email_verified = firebase_user.email_verified
request.state.user_claims = decoded_token.get("claims", {})

# Add complete user info for convenience
request.state.user_info = {
"uid": decoded_token["uid"],
"email": decoded_token.get("email"),
"name": decoded_token.get("name", ""),
"picture": decoded_token.get("picture", ""),
"email_verified": firebase_user.email_verified,
}

response = await call_next(request)

if isinstance(response, Response):
response.headers["X-Auth-User-ID"] = decoded_token["uid"]

return response

except firebase_admin.auth.RevokedIdTokenError:
self.logger.warning(f"Token has been revoked: {request.url.path}")
return JSONResponse(
status_code=401,
content={"detail": "Token has been revoked. Please reauthenticate."},
)
except firebase_admin.auth.ExpiredIdTokenError:
self.logger.warning(f"Token has expired: {request.url.path}")
return JSONResponse(
status_code=401,
content={"detail": "Token has expired. Please reauthenticate."},
)
except firebase_admin.auth.InvalidIdTokenError as e:
self.logger.warning(f"Invalid token: {request.url.path}, error: {str(e)}")
return JSONResponse(
status_code=401, content={"detail": "Invalid authentication token"}
)
except Exception as e:
self.logger.error(f"Authentication error for {request.url.path}: {str(e)}")
return JSONResponse(
status_code=401, content={"detail": f"Authentication failed: {str(e)}"}
)
79 changes: 79 additions & 0 deletions backend/app/routes/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer

from ..middleware.auth import UserRole
from ..schemas.auth import AuthResponse, LoginRequest, RefreshRequest, Token
from ..schemas.user import UserCreateRequest, UserCreateResponse
from ..services.implementations.auth_service import AuthService
from ..services.implementations.user_service import UserService
from ..utilities.service_utils import get_auth_service, get_user_service

router = APIRouter(prefix="/auth", tags=["auth"])
security = HTTPBearer()


# TODO: ADD RATE LIMITING
@router.post("/register", response_model=UserCreateResponse)
async def register_user(
user: UserCreateRequest, user_service: UserService = Depends(get_user_service)
):
try:
return await user_service.create_user(user)
except HTTPException as http_ex:
raise http_ex
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))


@router.post("/login", response_model=AuthResponse)
async def login(
request: Request,
credentials: LoginRequest,
auth_service: AuthService = Depends(get_auth_service),
):
try:
is_admin_portal = request.headers.get("X-Admin-Portal") == "true"
auth_response = auth_service.generate_token(
credentials.email, credentials.password
)
if is_admin_portal and not auth_service.is_authorized_by_role(
auth_response.access_token, {UserRole.ADMIN}
):
raise HTTPException(
status_code=403,
detail="Access denied. Admin privileges required for admin portal",
)

return auth_response

except HTTPException as http_ex:
raise http_ex
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))


@router.post("/logout")
async def logout(
request: Request,
credentials: HTTPAuthorizationCredentials = Depends(security),
auth_service: AuthService = Depends(get_auth_service),
):
try:
user_id = request.state.user_id
if not user_id:
raise HTTPException(status_code=401, detail="Authentication required")

auth_service.revoke_tokens(user_id)
return {"message": "Successfully logged out"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))


@router.post("/refresh", response_model=Token)
async def refresh(
refresh_data: RefreshRequest, auth_service: AuthService = Depends(get_auth_service)
):
try:
return auth_service.renew_token(refresh_data.refresh_token)
except Exception as e:
raise HTTPException(status_code=401, detail=str(e))
99 changes: 99 additions & 0 deletions backend/app/routes/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
from typing import Any, Dict

from fastapi import APIRouter, Request

from ..middleware.auth import has_roles
from ..schemas.user import UserRole

router = APIRouter(prefix="/test", tags=["test"])


@router.get("/middleware")
async def test_middleware(request: Request) -> Dict[str, Any]:
state_dict = {
key: getattr(request.state, key)
for key in dir(request.state)
if not key.startswith("_") and not callable(getattr(request.state, key))
}

return {
"message": "Authentication successful! User info from Firebase token:",
"middleware_state": state_dict,
"user_id": getattr(request.state, "user_id", None),
"user_email": getattr(request.state, "user_email", None),
"email_verified": getattr(request.state, "email_verified", None),
"user_claims": getattr(request.state, "user_claims", None),
"user_info": getattr(request.state, "user_info", None),
"request_id": getattr(request.state, "request_id", None),
"authorization_header": request.headers.get("Authorization", "Not provided"),
}


@router.get("/middleware-public")
async def test_middleware_public(request: Request) -> Dict[str, Any]:
state_dict = {
key: getattr(request.state, key)
for key in dir(request.state)
if not key.startswith("_") and not callable(getattr(request.state, key))
}

auth_header = request.headers.get("Authorization")
auth_message = "No authentication required for this endpoint"
if auth_header:
auth_message += " (but you provided an auth header anyway)"

return {
"message": "Public endpoint - No authentication required",
"auth_status": auth_message,
"middleware_state": state_dict,
"request_id": getattr(request.state, "request_id", None),
"authorization_header": request.headers.get("Authorization", "Not provided"),
}


@router.get("/role-admin")
async def test_role_admin(
request: Request, authorized: bool = has_roles([UserRole.ADMIN])
) -> Dict[str, Any]:
return {
"message": "Successfully accessed an admin-only endpoint",
"user_id": request.state.user_id,
"user_email": request.state.user_email,
"role": "admin",
}


@router.get("/role-volunteer")
async def test_role_volunteer(
request: Request, authorized: bool = has_roles([UserRole.VOLUNTEER])
) -> Dict[str, Any]:
return {
"message": "Successfully accessed a volunteer-only endpoint",
"user_id": request.state.user_id,
"user_email": request.state.user_email,
"role": "volunteer",
}


@router.get("/role-participant")
async def test_role_participant(
request: Request, authorized: bool = has_roles([UserRole.PARTICIPANT])
) -> Dict[str, Any]:
return {
"message": "Successfully accessed a participant-only endpoint",
"user_id": request.state.user_id,
"user_email": request.state.user_email,
"role": "participant",
}


@router.get("/role-multiple")
async def test_role_multiple(
request: Request, authorized: bool = has_roles([UserRole.ADMIN, UserRole.VOLUNTEER])
) -> Dict[str, Any]:
return {
"message": "Successfully accessed an admin or volunteer endpoint",
"user_id": request.state.user_id,
"user_email": request.state.user_email,
"roles_allowed": ["admin", "volunteer"],
}
15 changes: 7 additions & 8 deletions backend/app/routes/user.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session

from app.schemas.user import UserCreateRequest, UserCreateResponse
from app.middleware.auth import has_roles
from app.schemas.user import UserCreateRequest, UserCreateResponse, UserRole
from app.services.implementations.user_service import UserService
from app.utilities.db_utils import get_db
from app.utilities.service_utils import get_user_service

router = APIRouter(
prefix="/users",
Expand All @@ -15,13 +15,12 @@
# allow signup methods other than email (like sign up w Google)??


def get_user_service(db: Session = Depends(get_db)):
return UserService(db)


# admin only manually create user, not sure if this is needed
@router.post("/", response_model=UserCreateResponse)
async def create_user(
user: UserCreateRequest, user_service: UserService = Depends(get_user_service)
user: UserCreateRequest,
user_service: UserService = Depends(get_user_service),
authorized: bool = has_roles([UserRole.ADMIN]),
):
try:
return await user_service.create_user(user)
Expand Down
Loading