diff --git a/litellm/proxy/auth/auth_checks_organization.py b/litellm/proxy/auth/auth_checks_organization.py index e96a5c61fc05..4e68b69ed283 100644 --- a/litellm/proxy/auth/auth_checks_organization.py +++ b/litellm/proxy/auth/auth_checks_organization.py @@ -1,11 +1,10 @@ """ -Auth Checks for Organizations +Organization Auth Checks with Enhanced Security and Type Safety """ +from uuid import UUID from typing import Dict, List, Optional, Tuple - from fastapi import status - from litellm.proxy._types import * @@ -13,130 +12,96 @@ def organization_role_based_access_check( request_body: dict, user_object: Optional[LiteLLM_UserTable], route: str, -): +) -> None: """ - Role based access control checks only run if a user is part of an Organization - - Organization Checks: - ONLY RUN IF user_object.organization_memberships is not None - - 1. Only Proxy Admins can access /organization/new - 2. IF route is a LiteLLMRoutes.org_admin_only_routes, then check if user is an Org Admin for that organization - + Enhanced organization access control with: + - UUID validation for organization_id + - Type-safe role comparisons + - Consolidated permission checks """ - if user_object is None: return - passed_organization_id: Optional[str] = request_body.get("organization_id", None) - - if route == "/organization/new": - if user_object.user_role != LitellmUserRoles.PROXY_ADMIN.value: + # Validate organization_id format if present + passed_organization_id = request_body.get("organization_id") + if passed_organization_id: + try: + UUID(passed_organization_id, version=4) + except ValueError as e: raise ProxyException( - message=f"Only proxy admins can create new organizations. You are {user_object.user_role}", + message=f"Invalid organization_id format: {str(e)}", type=ProxyErrorTypes.auth_error.value, - param="user_role", - code=status.HTTP_401_UNAUTHORIZED, + code=status.HTTP_400_BAD_REQUEST, ) - if user_object.user_role == LitellmUserRoles.PROXY_ADMIN.value: - return - - # Checks if route is an Org Admin Only Route - if route in LiteLLMRoutes.org_admin_only_routes.value: - ( - _user_organizations, - _user_organization_role_mapping, - ) = get_user_organization_info(user_object) - - if user_object.organization_memberships is None: + # Proxy admin-only route check + if route == "/organization/new": + if user_object.user_role != LitellmUserRoles.PROXY_ADMIN: raise ProxyException( - message=f"Tried to access route={route} but you are not a member of any organization. Please contact the proxy admin to request access.", + message="Insufficient permissions for organization creation", + detail=f"Required role: {LitellmUserRoles.PROXY_ADMIN.value}", type=ProxyErrorTypes.auth_error.value, - param="organization_id", - code=status.HTTP_401_UNAUTHORIZED, + code=status.HTTP_403_FORBIDDEN, ) + return - if passed_organization_id is None: - raise ProxyException( - message="Passed organization_id is None, please pass an organization_id in your request", - type=ProxyErrorTypes.auth_error.value, - param="organization_id", - code=status.HTTP_401_UNAUTHORIZED, - ) + # Bypass checks for proxy admins + if user_object.user_role == LitellmUserRoles.PROXY_ADMIN: + return + + # Get organization info once + user_orgs, role_mapping = get_user_organization_info(user_object) - user_role: Optional[LitellmUserRoles] = _user_organization_role_mapping.get( - passed_organization_id - ) - if user_role is None: + # Org admin required routes + if route in LiteLLMRoutes.org_admin_only_routes.value: + if not passed_organization_id: raise ProxyException( - message=f"You do not have a role within the selected organization. Passed organization_id: {passed_organization_id}. Please contact the organization admin to request access.", + message="organization_id required for this operation", type=ProxyErrorTypes.auth_error.value, - param="organization_id", - code=status.HTTP_401_UNAUTHORIZED, + code=status.HTTP_400_BAD_REQUEST, ) - if user_role != LitellmUserRoles.ORG_ADMIN.value: + user_role = role_mapping.get(passed_organization_id, LitellmUserRoles.INTERNAL_USER) + + if user_role != LitellmUserRoles.ORG_ADMIN: + available_orgs = "\n".join([f"{k}: {v.value}" for k,v in role_mapping.items()]) raise ProxyException( - message=f"You do not have the required role to perform {route} in Organization {passed_organization_id}. Your role is {user_role} in Organization {passed_organization_id}", + message="Organization admin privileges required", + detail=f"Required role: {LitellmUserRoles.ORG_ADMIN.value}\nYour roles:\n{available_orgs}", type=ProxyErrorTypes.auth_error.value, - param="user_role", - code=status.HTTP_401_UNAUTHORIZED, - ) - elif route == "/team/new": - # if user is part of multiple teams, then they need to specify the organization_id - ( - _user_organizations, - _user_organization_role_mapping, - ) = get_user_organization_info(user_object) - if ( - user_object.organization_memberships is not None - and len(user_object.organization_memberships) > 0 - ): - if passed_organization_id is None: - raise ProxyException( - message=f"Passed organization_id is None, please specify the organization_id in your request. You are part of multiple organizations: {_user_organizations}", - type=ProxyErrorTypes.auth_error.value, - param="organization_id", - code=status.HTTP_401_UNAUTHORIZED, - ) - - _user_role_in_passed_org = _user_organization_role_mapping.get( - passed_organization_id + code=status.HTTP_403_FORBIDDEN, ) - if _user_role_in_passed_org != LitellmUserRoles.ORG_ADMIN.value: - raise ProxyException( - message=f"You do not have the required role to call {route}. Your role is {_user_role_in_passed_org} in Organization {passed_organization_id}", - type=ProxyErrorTypes.auth_error.value, - param="user_role", - code=status.HTTP_401_UNAUTHORIZED, - ) def get_user_organization_info( user_object: LiteLLM_UserTable, -) -> Tuple[List[str], Dict[str, Optional[LitellmUserRoles]]]: +) -> Tuple[List[str], Dict[str, LitellmUserRoles]]: + """ + Returns validated organization info with: + - Type-safe role conversions + - Empty collection handling """ - Helper function to extract user organization information. + if not user_object or not user_object.organization_memberships: + return [], {} - Args: - user_object (LiteLLM_UserTable): The user object containing organization memberships. + organizations = [] + role_mapping = {} - Returns: - Tuple[List[str], Dict[str, Optional[LitellmUserRoles]]]: A tuple containing: - - List of organization IDs the user is a member of - - Dictionary mapping organization IDs to user roles - """ - _user_organizations: List[str] = [] - _user_organization_role_mapping: Dict[str, Optional[LitellmUserRoles]] = {} + for membership in user_object.organization_memberships: + if not membership.organization_id: + continue - if user_object.organization_memberships is not None: - for _membership in user_object.organization_memberships: - if _membership.organization_id is not None: - _user_organizations.append(_membership.organization_id) - _user_organization_role_mapping[_membership.organization_id] = _membership.user_role # type: ignore + organizations.append(membership.organization_id) + + # Convert to enum with fallback + try: + role = LitellmUserRoles(membership.user_role) + except ValueError: + role = LitellmUserRoles.INTERNAL_USER + + role_mapping[membership.organization_id] = role - return _user_organizations, _user_organization_role_mapping + return organizations, role_mapping def _user_is_org_admin( @@ -144,20 +109,16 @@ def _user_is_org_admin( user_object: Optional[LiteLLM_UserTable] = None, ) -> bool: """ - Helper function to check if user is an org admin for the passed organization_id + Efficient admin check with: + - Early exit conditions + - Generator expression for performance """ - if request_data.get("organization_id", None) is None: - return False - - if user_object is None: + org_id = request_data.get("organization_id") + if not org_id or not user_object: return False - if user_object.organization_memberships is None: - return False - - for _membership in user_object.organization_memberships: - if _membership.organization_id == request_data.get("organization_id", None): - if _membership.user_role == LitellmUserRoles.ORG_ADMIN.value: - return True - - return False + return any( + m.organization_id == org_id + and m.user_role == LitellmUserRoles.ORG_ADMIN + for m in user_object.organization_memberships or [] + )