Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
**/venv
**/__pycache__
**/*.log
**/firebaseServiceAccount.json
**/serviceAccountKey.json
**/.DS_Store
**/*.cache
**/*.egg-info
25 changes: 25 additions & 0 deletions backend/app/auth_routes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from fastapi import FASTAPI, HTTPException, Depends
from firebase_admin import auth
import firebase_unit
from pydantic import BaseModel

app = FastAPI()
class User(BaseModel):
email: str
password: str

@app.post("/signup")
async def sign_up(user: User):
try:
new_user = auth.create_user(email=user.email, password=user.password)
return {"message": "User created successfully", "user_id": new_user.uid}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))

@app.post("/login")
async def login(user: User):
try:
custom_token = auth.create_custom_token(user.email)
return {"token": custom_token}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
2 changes: 1 addition & 1 deletion backend/app/interfaces/user_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def get_users(self):
pass

@abstractmethod
def create_user(self, user, auth_id=None, signup_method="PASSWORD"):
def create_user(self, user, signup_method="PASSWORD"):
"""
Create a user, email verification configurable

Expand Down
61 changes: 61 additions & 0 deletions backend/app/middlewares/authmiddleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import os
import firebase

from firebase import credentials, auth
from fastapi import FastAPI, Request, HTTPException, status, Depends, HTTPException, Security
from fastapi.responses import JSONResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import List

app = FastAPI()

#use is_authorized_by_role to get info about user - NOT DONE
#input is user (get info when accessing endpoint?)
#output is user info found using token - token, role, etc.
class get_user_info(is_authorized_by_role):
def __init__(self, roles: List[str]):
#self.roles = roles

def __call__(self, user: dict = Depends(verify_firebase_token)):
#role = user.get("role")
#if role not in self.roles:
# raise HTTPException(
# status_code=403,
# detail=f"Access denied: role '{role}' not authorized"
# )
#return user

#middleware function
#input allowed roles, endpoint
#output: allowing/disallowing access to endpoint
def role_based_access_control(allowed_roles: List[str]):
def middleware_decorator(endpoint: Callable):
async def wrapper(request: Request, *args, **kwargs):
roles = getattr(request.state, "user_roles", [])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's much much safer to get a user's roles from firebase or the db ourselves using the methods in the auth service, instead of sending it in the request

imagine someone malicious adding in roles to their requests that they don't have access to, which would let them access other roles that they haven't been assigned to in the db

if not allowed_roles:
return await endpoint(request, *args, **kwargs)
if not any(role in allowed_roles for role in roles):
raise HTTPException(
status_code=403,
detail="Access denied: user cannot access this endpoint"
)
return await endpoint(request, *args, **kwargs)
return wrapper
return middleware_decorator

#add middleware to request state
#basically applies middleware function to all requests
@app.middleware("http")
async def add_user_roles(request: Request, call_next):
#before request
request.state.user_roles = ["user"] #get roles from is_authorized_by_roles
#run request
response = await call_next(request)
#after request
return response

#example: applying middleware to endpoint
@app.post("/admin_only")
@role_based_access_control(allowed_roles=["admin"])
async def admin_endpoint(request: Request):
return {"message": "This is an admin endpoint"}
20 changes: 20 additions & 0 deletions backend/app/routes/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from ..schemas.auth import AuthResponse, LoginRequest
from ..services.implementations.auth_service import AuthService
from ..services.implementations.user_service import UserService
from ..utilities.db_utils import get_db
import logging

router = APIRouter(prefix="/auth", tags=["auth"])

def get_auth_service(db: Session = Depends(get_db)):
logger = logging.getLogger(__name__)
return AuthService(logger=logger, user_service=UserService(db))

@router.post("/login", response_model=AuthResponse)
async def login(
credentials: LoginRequest,
auth_service: AuthService = Depends(get_auth_service)
):
return auth_service.generate_token(credentials.email, credentials.password)
31 changes: 31 additions & 0 deletions backend/app/routes/user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session

from app.schemas.user import UserCreateRequest, UserCreateResponse
from app.services.implementations.user_service import UserService
from app.utilities.db_utils import get_db

router = APIRouter(
prefix="/users",
tags=["users"],
)

# TODO:
# send email verification via auth_service
# allow signup methods other than email (like sign up w Google)??


def get_user_service(db: Session = Depends(get_db)):
return UserService(db)


@router.post("/", response_model=UserCreateResponse)
async def create_user(
user: UserCreateRequest, user_service: UserService = Depends(get_user_service)
):
try:
return await user_service.create_user(user)
except HTTPException as http_ex:
raise http_ex
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
23 changes: 23 additions & 0 deletions backend/app/schemas/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from pydantic import BaseModel, ConfigDict
from .user import UserCreateResponse

class LoginRequest(BaseModel):
email: str
password: str

class Token(BaseModel):
"""
For authentication tokens from Firebase
Access tokens are short-lived and used to access resources
Refresh tokens are long-lived and used to obtain new access tokens
"""
access_token: str
refresh_token: str

model_config = ConfigDict(from_attributes=True)

class AuthResponse(Token):
"""Authentication response containing tokens and user info"""
user: UserCreateResponse

model_config = ConfigDict(from_attributes=True)
89 changes: 89 additions & 0 deletions backend/app/schemas/user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
"""
Pydantic schemas for user-related data validation and serialization.
Handles user CRUD and response models for the API.
"""

from enum import Enum
from typing import Optional
from uuid import UUID

from pydantic import BaseModel, ConfigDict, EmailStr, Field, field_validator

# TODO:
# confirm complexity rules for fields (such as password)


class SignUpMethod(str, Enum):
"""Authentication methods supported for user signup"""

PASSWORD = "PASSWORD"
GOOGLE = "GOOGLE"


class UserRole(str, Enum):
"""
Enum for possible user roles.
"""

PARTICIPANT = "participant"
VOLUNTEER = "volunteer"
ADMIN = "admin"

@classmethod
def to_role_id(cls, role: "UserRole") -> int:
role_map = {cls.PARTICIPANT: 1, cls.VOLUNTEER: 2, cls.ADMIN: 3}
return role_map[role]


class UserBase(BaseModel):
"""
Base schema for user model with common attributes shared across schemas.
"""

first_name: str = Field(..., min_length=1, max_length=50)
last_name: str = Field(..., min_length=1, max_length=50)
email: EmailStr
role: UserRole


class UserCreateRequest(UserBase):
"""
Request schema for user creation with conditional password validation
"""

password: Optional[str] = Field(None, min_length=8)
auth_id: Optional[str] = Field(None)
signup_method: SignUpMethod = Field(default=SignUpMethod.PASSWORD)

@field_validator("password")
def validate_password(cls, password: Optional[str], info):
signup_method = info.data.get("signup_method")

if signup_method == SignUpMethod.PASSWORD and not password:
raise ValueError("Password is required for password signup")

if password:
if not any(char.isdigit() for char in password):
raise ValueError("Password must contain at least one digit")
if not any(char.isupper() for char in password):
raise ValueError("Password must contain at least one uppercase letter")
if not any(char.islower() for char in password):
raise ValueError("Password must contain at least one lowercase letter")

return password


class UserCreateResponse(BaseModel):
"""
Response schema for user creation, maps directly from ORM User object.
"""

id: UUID
first_name: str
last_name: str
email: EmailStr
role_id: int
auth_id: str

# from_attributes enables automatic mapping from SQLAlchemy model to Pydantic model
model_config = ConfigDict(from_attributes=True)
17 changes: 13 additions & 4 deletions backend/app/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,32 @@
from dotenv import load_dotenv
from fastapi import FastAPI

from app.routes import email

from . import models
from app.routes import auth, email

load_dotenv()

# we need to load env variables before initialization code runs
from . import models # noqa: E402
from .routes import user # noqa: E402
from .utilities.firebase_init import initialize_firebase # noqa: E402

log = logging.getLogger("uvicorn")


@asynccontextmanager
async def lifespan(_: FastAPI):
log.info("Starting up...")
models.run_migrations()
initialize_firebase()
yield
log.info("Shutting down...")


# Source: https://stackoverflow.com/questions/77170361/
# running-alembic-migrations-on-fastapi-startup
app = FastAPI(lifespan=lifespan)

app.include_router(auth.router)
app.include_router(user.router)
app.include_router(email.router)


Expand All @@ -37,3 +42,7 @@ def read_root():
@app.get("/items/{item_id}")
def read_item(item_id: int, q: Union[str, None] = None):
return {"item_id": item_id, "q": q}

#initializing firebase
cred = credentials.Certificate("llsc/backend/python/firebaseServiceAccount.json")
firebase_admin.initialize_app(cred)
Loading