Skip to content

auth_checks_organization.py #10770

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 72 additions & 111 deletions litellm/proxy/auth/auth_checks_organization.py
Original file line number Diff line number Diff line change
@@ -1,163 +1,124 @@
"""
Auth Checks for Organizations
Organization Auth Checks with Enhanced Security and Type Safety
"""

from uuid import UUID
from typing import Dict, List, Optional, Tuple

from fastapi import status

from litellm.proxy._types import *


def organization_role_based_access_check(
request_body: dict,
user_object: Optional[LiteLLM_UserTable],
route: str,
):
) -> None:
"""
Role based access control checks only run if a user is part of an Organization

Organization Checks:
ONLY RUN IF user_object.organization_memberships is not None

1. Only Proxy Admins can access /organization/new
2. IF route is a LiteLLMRoutes.org_admin_only_routes, then check if user is an Org Admin for that organization

Enhanced organization access control with:
- UUID validation for organization_id
- Type-safe role comparisons
- Consolidated permission checks
"""

if user_object is None:
return

passed_organization_id: Optional[str] = request_body.get("organization_id", None)

if route == "/organization/new":
if user_object.user_role != LitellmUserRoles.PROXY_ADMIN.value:
# Validate organization_id format if present
passed_organization_id = request_body.get("organization_id")
if passed_organization_id:
try:
UUID(passed_organization_id, version=4)
except ValueError as e:
raise ProxyException(
message=f"Only proxy admins can create new organizations. You are {user_object.user_role}",
message=f"Invalid organization_id format: {str(e)}",
type=ProxyErrorTypes.auth_error.value,
param="user_role",
code=status.HTTP_401_UNAUTHORIZED,
code=status.HTTP_400_BAD_REQUEST,
)

if user_object.user_role == LitellmUserRoles.PROXY_ADMIN.value:
return

# Checks if route is an Org Admin Only Route
if route in LiteLLMRoutes.org_admin_only_routes.value:
(
_user_organizations,
_user_organization_role_mapping,
) = get_user_organization_info(user_object)

if user_object.organization_memberships is None:
# Proxy admin-only route check
if route == "/organization/new":
if user_object.user_role != LitellmUserRoles.PROXY_ADMIN:
raise ProxyException(
message=f"Tried to access route={route} but you are not a member of any organization. Please contact the proxy admin to request access.",
message="Insufficient permissions for organization creation",
detail=f"Required role: {LitellmUserRoles.PROXY_ADMIN.value}",
type=ProxyErrorTypes.auth_error.value,
param="organization_id",
code=status.HTTP_401_UNAUTHORIZED,
code=status.HTTP_403_FORBIDDEN,
)
return

if passed_organization_id is None:
raise ProxyException(
message="Passed organization_id is None, please pass an organization_id in your request",
type=ProxyErrorTypes.auth_error.value,
param="organization_id",
code=status.HTTP_401_UNAUTHORIZED,
)
# Bypass checks for proxy admins
if user_object.user_role == LitellmUserRoles.PROXY_ADMIN:
return

# Get organization info once
user_orgs, role_mapping = get_user_organization_info(user_object)

user_role: Optional[LitellmUserRoles] = _user_organization_role_mapping.get(
passed_organization_id
)
if user_role is None:
# Org admin required routes
if route in LiteLLMRoutes.org_admin_only_routes.value:
if not passed_organization_id:
raise ProxyException(
message=f"You do not have a role within the selected organization. Passed organization_id: {passed_organization_id}. Please contact the organization admin to request access.",
message="organization_id required for this operation",
type=ProxyErrorTypes.auth_error.value,
param="organization_id",
code=status.HTTP_401_UNAUTHORIZED,
code=status.HTTP_400_BAD_REQUEST,
)

if user_role != LitellmUserRoles.ORG_ADMIN.value:
user_role = role_mapping.get(passed_organization_id, LitellmUserRoles.INTERNAL_USER)

if user_role != LitellmUserRoles.ORG_ADMIN:
available_orgs = "\n".join([f"{k}: {v.value}" for k,v in role_mapping.items()])
raise ProxyException(
message=f"You do not have the required role to perform {route} in Organization {passed_organization_id}. Your role is {user_role} in Organization {passed_organization_id}",
message="Organization admin privileges required",
detail=f"Required role: {LitellmUserRoles.ORG_ADMIN.value}\nYour roles:\n{available_orgs}",
type=ProxyErrorTypes.auth_error.value,
param="user_role",
code=status.HTTP_401_UNAUTHORIZED,
)
elif route == "/team/new":
# if user is part of multiple teams, then they need to specify the organization_id
(
_user_organizations,
_user_organization_role_mapping,
) = get_user_organization_info(user_object)
if (
user_object.organization_memberships is not None
and len(user_object.organization_memberships) > 0
):
if passed_organization_id is None:
raise ProxyException(
message=f"Passed organization_id is None, please specify the organization_id in your request. You are part of multiple organizations: {_user_organizations}",
type=ProxyErrorTypes.auth_error.value,
param="organization_id",
code=status.HTTP_401_UNAUTHORIZED,
)

_user_role_in_passed_org = _user_organization_role_mapping.get(
passed_organization_id
code=status.HTTP_403_FORBIDDEN,
)
if _user_role_in_passed_org != LitellmUserRoles.ORG_ADMIN.value:
raise ProxyException(
message=f"You do not have the required role to call {route}. Your role is {_user_role_in_passed_org} in Organization {passed_organization_id}",
type=ProxyErrorTypes.auth_error.value,
param="user_role",
code=status.HTTP_401_UNAUTHORIZED,
)


def get_user_organization_info(
user_object: LiteLLM_UserTable,
) -> Tuple[List[str], Dict[str, Optional[LitellmUserRoles]]]:
) -> Tuple[List[str], Dict[str, LitellmUserRoles]]:
"""
Returns validated organization info with:
- Type-safe role conversions
- Empty collection handling
"""
Helper function to extract user organization information.
if not user_object or not user_object.organization_memberships:
return [], {}

Args:
user_object (LiteLLM_UserTable): The user object containing organization memberships.
organizations = []
role_mapping = {}

Returns:
Tuple[List[str], Dict[str, Optional[LitellmUserRoles]]]: A tuple containing:
- List of organization IDs the user is a member of
- Dictionary mapping organization IDs to user roles
"""
_user_organizations: List[str] = []
_user_organization_role_mapping: Dict[str, Optional[LitellmUserRoles]] = {}
for membership in user_object.organization_memberships:
if not membership.organization_id:
continue

if user_object.organization_memberships is not None:
for _membership in user_object.organization_memberships:
if _membership.organization_id is not None:
_user_organizations.append(_membership.organization_id)
_user_organization_role_mapping[_membership.organization_id] = _membership.user_role # type: ignore
organizations.append(membership.organization_id)

# Convert to enum with fallback
try:
role = LitellmUserRoles(membership.user_role)
except ValueError:
role = LitellmUserRoles.INTERNAL_USER

role_mapping[membership.organization_id] = role

return _user_organizations, _user_organization_role_mapping
return organizations, role_mapping


def _user_is_org_admin(
request_data: dict,
user_object: Optional[LiteLLM_UserTable] = None,
) -> bool:
"""
Helper function to check if user is an org admin for the passed organization_id
Efficient admin check with:
- Early exit conditions
- Generator expression for performance
"""
if request_data.get("organization_id", None) is None:
return False

if user_object is None:
org_id = request_data.get("organization_id")
if not org_id or not user_object:
return False

if user_object.organization_memberships is None:
return False

for _membership in user_object.organization_memberships:
if _membership.organization_id == request_data.get("organization_id", None):
if _membership.user_role == LitellmUserRoles.ORG_ADMIN.value:
return True

return False
return any(
m.organization_id == org_id
and m.user_role == LitellmUserRoles.ORG_ADMIN
for m in user_object.organization_memberships or []
)
Loading