Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 8 additions & 17 deletions backend/python/app/dependencies/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,15 @@ async def check_role(
access_token: str = Depends(get_access_token),
session: AsyncSession = Depends(get_session),
) -> bool:
try:
# TODO: Fix Authorization and Role Based Management in future commit.

# authorized = await auth_service.is_authorized_by_role(
# session, access_token, roles
# )
# if not authorized:
# raise HTTPException(
# status_code=status.HTTP_401_UNAUTHORIZED,
# detail="You are not authorized to make this request.",
# )

return True
except Exception as e:
authorized = await auth_service.is_authorized_by_role(
session, access_token, roles
)
if not authorized:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
status_code=status.HTTP_403_FORBIDDEN,
detail="You are not authorized to make this request.",
) from e
)
return True

return check_role

Expand Down Expand Up @@ -137,7 +128,7 @@ def check_email(access_token: str = Depends(get_access_token)) -> bool:


# Common authorization dependencies
require_driver = require_authorization_by_role({"Driver"})
require_driver = require_authorization_by_role({"driver"})
Copy link
Collaborator

@ludavidca ludavidca Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a quick food for thought, in our codebase, there are two roles drivers and admin, and some routes are admin only. This means that by default, admin should be able to access all routes, so it might not make sense to have a require driver.

This gives 2 solutions:

  • Simpler: Delete the require driver entirely and only use require_admin, and only require valid auth checks for some api routes.
  • More Complex but better: delete require_driver, and use ID-based checks for driver self-service (e.g., driver can only update their own profile)
    Admin ID bypasses all id checks or has access to all IDs

Lmk which one you would prefer and your thoughts!



def get_current_user_id(access_token: str = Depends(get_access_token)) -> str:
Expand Down
4 changes: 3 additions & 1 deletion backend/python/app/models/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ class Admin(AdminBase, BaseModel, table=True):
__tablename__ = "admin_info"

admin_id: UUID = Field(default_factory=uuid4, primary_key=True)
user_id: UUID = Field(foreign_key="users.user_id", unique=True, nullable=False)
user_id: UUID = Field(
foreign_key="users.user_id", unique=True, nullable=False, ondelete="CASCADE"
)

user: User = Relationship()

Expand Down
4 changes: 3 additions & 1 deletion backend/python/app/models/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ class Driver(DriverBase, BaseModel, table=True):
__tablename__ = "drivers"

driver_id: UUID = Field(default_factory=uuid4, primary_key=True, index=True)
user_id: UUID = Field(foreign_key="users.user_id", unique=True, nullable=False)
user_id: UUID = Field(
foreign_key="users.user_id", unique=True, nullable=False, ondelete="CASCADE"
)
user: User = Relationship()


Expand Down
75 changes: 2 additions & 73 deletions backend/python/app/services/implementations/auth_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import firebase_admin.auth
from sqlalchemy.ext.asyncio import AsyncSession

from app.models.driver import DriverCreate
from app.models.user import UserCreate
from app.schemas.auth import AuthResponse, TokenResponse
from app.utilities.firebase_rest_client import FirebaseRestClient

Expand Down Expand Up @@ -76,73 +74,6 @@ async def generate_token(
# Always return the same generic error message to prevent enumeration
raise ValueError("Invalid email or password") from e

async def generate_token_for_oauth(
self, session: AsyncSession, id_token: str
) -> AuthResponse:
try:
# Verify the ID token with Firebase
decoded_token: dict[str, str] = firebase_admin.auth.verify_id_token(
id_token
)
user_id = decoded_token["uid"]
email = decoded_token["email"]

# If user already has a login with this email, just return the token
try:
# Note: an error message will be logged from UserService if this lookup fails.
# You may want to silence the logger for this special OAuth user lookup case
user = await self.user_service.get_user_by_email(session, email)
if user is None:
self.logger.warning(
f"Firebase user {email} exists but not found in database - potential data inconsistency"
)
raise ValueError("Invalid email or password")

return AuthResponse(
access_token=id_token,
id=user.user_id,
name=user.name,
email=user.email,
)
except Exception:
pass

# Create new user and driver for OAuth
user = await self.user_service.create_user(
session,
UserCreate(
name=decoded_token.get("name", "")
if decoded_token.get("name")
else "",
email=email,
password="placeholder", # TODO: How to handle this?
),
auth_id=user_id,
signup_method="GOOGLE",
)
await self.driver_service.create_driver(
session,
DriverCreate(
phone="", # OAuth users don't have phone initially
address="", # OAuth users don't have address initially
license_plate="", # OAuth users don't have license plate initially
car_make_model="", # OAuth users don't have car info initially
user_id=user.user_id,
),
)
return AuthResponse(
access_token=id_token,
id=user.user_id,
name=user.name,
email=user.email,
)
except Exception as e:
reason = getattr(e, "message", None)
self.logger.error(
f"Failed to generate token for user with OAuth id token. Reason = {reason if reason else str(e)}"
)
raise e

async def revoke_tokens(self, session: AsyncSession, user_id: UUID) -> None:
try:
auth_id = await self.user_service.get_auth_id_by_user_id(session, user_id)
Expand Down Expand Up @@ -222,10 +153,8 @@ def send_email_verification_link(self, email: str) -> None:
raise e

async def is_authorized_by_role(
self, _session: AsyncSession, access_token: str, _roles: set[str]
self, _session: AsyncSession, access_token: str, roles: set[str]
) -> bool:
# TODO: Maybe add db role check for extra security? I highly doubt users will switch roles though...
# Also would have to deal with performance bottlenecks
try:
decoded_id_token = firebase_admin.auth.verify_id_token(
access_token, check_revoked=True
Expand All @@ -237,7 +166,7 @@ async def is_authorized_by_role(
)
return False
# Allow if role is in the authorized set
return user_role in _roles
return user_role in roles
except Exception as e:
self.logger.error(f"Authorization failed: {type(e).__name__}: {e!s}")
return False
Expand Down
11 changes: 3 additions & 8 deletions backend/python/app/services/implementations/user_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,15 @@ async def create_user(
self,
session: AsyncSession,
user_data: UserCreate,
auth_id: str | None = None,
signup_method: str = "PASSWORD",
) -> User:
"""Create new user with Firebase integration"""
firebase_user: UserRecord | None = None

try:
# Create Firebase user
if signup_method == "PASSWORD":
firebase_user = firebase_admin.auth.create_user(
email=user_data.email, password=user_data.password
)
elif signup_method == "GOOGLE":
firebase_user = firebase_admin.auth.get_user(uid=auth_id)
firebase_user = firebase_admin.auth.create_user(
email=user_data.email, password=user_data.password
)

# Create database user
if firebase_user is None:
Expand Down
44 changes: 44 additions & 0 deletions backend/python/app/update_firebase.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from firebase_admin import auth

from app import initialize_firebase

admin_email = "[email protected]"

Comment on lines +4 to +6
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be best to have this as an env variable to avoid hardcoded values


def update_all_users_role(role_name: str) -> None:
"""
Comment on lines +7 to +9
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great job on implementing a way to update all roles!

I realize that I should have probably better specified this in the ticket, but can we add another function to update only one user to admin or driver instead? This way, we won't have people changing other people's roles in the middle of development. Thanks!

Iterates through all Firebase users and sets a custom 'role' claim.
Always ensures the admin only has the role 'admin'
"""
initialize_firebase()
print(f"Starting update: Setting all non admin users to role: {role_name}")

# List all users (paginated)
page = auth.list_users()
count = 0

while page:
for user in page.users:
try:
if user.email == admin_email:
auth.set_custom_user_claims(user.uid, {"role": "admin"})
else:
# This overwrites existing claims, so be careful if you have other claims!
auth.set_custom_user_claims(user.uid, {"role": role_name})

print(f"Updated UID: {user.uid} ({user.email})")
count += 1
except Exception as e:
print(f"Failed to update {user.uid}: {e}")

# Get the next page of users
page = page.get_next_page()

print(f"\nSuccessfully updated {count} users.")


if __name__ == "__main__":
# Change to new desired role
# NOTE: This overwrites preexisting roles so be careful!
new_role = "driver"
update_all_users_role(new_role)
Comment on lines +42 to +44
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the update_your_user_role is implemented (with input param being your email). Please feel free to make that the default main function that runs, and make a variable your_email = "" to be used by the function!

Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Add cascade delete to Driver and Admin

Revision ID: 01f342ea9ad6
Revises: ba76119b3e4c
Create Date: 2026-02-08 00:31:30.165149

"""
from alembic import op

# revision identifiers, used by Alembic.
revision = '01f342ea9ad6'
down_revision = 'ba76119b3e4c'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(op.f('admin_info_user_id_fkey'), 'admin_info', type_='foreignkey')
op.create_foreign_key(None, 'admin_info', 'users', ['user_id'], ['user_id'], ondelete='CASCADE')
op.drop_constraint(op.f('drivers_user_id_fkey'), 'drivers', type_='foreignkey')
op.create_foreign_key(None, 'drivers', 'users', ['user_id'], ['user_id'], ondelete='CASCADE')
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(None, 'drivers', type_='foreignkey')
op.create_foreign_key(op.f('drivers_user_id_fkey'), 'drivers', 'users', ['user_id'], ['user_id'])
op.drop_constraint(None, 'admin_info', type_='foreignkey')
op.create_foreign_key(op.f('admin_info_user_id_fkey'), 'admin_info', 'users', ['user_id'], ['user_id'])
# ### end Alembic commands ###