Skip to content

Commit

Permalink
Moved to use ID instead of email
Browse files Browse the repository at this point in the history
  • Loading branch information
Matthew Fortunka committed Feb 19, 2025
1 parent 081055c commit fee832a
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 165 deletions.
41 changes: 17 additions & 24 deletions api_app/api/routes/workspace_users.py
Original file line number Diff line number Diff line change
@@ -1,67 +1,60 @@
from fastapi import APIRouter, Depends, Response, status
from api.dependencies.workspaces import get_workspace_by_id_from_path
from models.domain.authentication import AssignmentType
from resources import strings
from services.authentication import get_access_service
from models.schemas.users import UsersInResponse, AssignableUsersInResponse
from models.schemas.roles import RolesInResponse
from services.authentication import get_current_admin_user, get_current_workspace_owner_or_researcher_user_or_airlock_manager_or_tre_admin
from services.logging import logger

workspaces_users_admin_router = APIRouter(dependencies=[Depends(get_current_admin_user)])
workspaces_users_shared_router = APIRouter(dependencies=[Depends(get_current_workspace_owner_or_researcher_user_or_airlock_manager_or_tre_admin)])


@workspaces_users_shared_router.get("/workspaces/{workspace_id}/users", response_model=UsersInResponse, name=strings.API_GET_WORKSPACE_USERS)
async def get_workspace_users(workspace=Depends(get_workspace_by_id_from_path)) -> UsersInResponse:
access_service = get_access_service()
async def get_workspace_users(workspace=Depends(get_workspace_by_id_from_path), access_service=Depends(get_access_service)) -> UsersInResponse:
users = access_service.get_workspace_users(workspace)
return UsersInResponse(users=users)


@workspaces_users_admin_router.get("/workspaces/{workspace_id}/assignable-users", response_model=AssignableUsersInResponse, name=strings.API_GET_ASSIGNABLE_USERS)
async def get_assignable_users(filter: str = "", maxResultCount: int = 5, workspace=Depends(get_workspace_by_id_from_path)) -> AssignableUsersInResponse:
access_service = get_access_service()
async def get_assignable_users(filter: str = "", maxResultCount: int = 5, access_service=Depends(get_access_service)) -> AssignableUsersInResponse:
assignable_users = access_service.get_assignable_users(filter, maxResultCount)
return AssignableUsersInResponse(assignable_users=assignable_users)


@workspaces_users_admin_router.get("/workspaces/{workspace_id}/roles", response_model=RolesInResponse, name=strings.API_GET_WORKSPACE_ROLES)
async def get_workspace_roles(workspace=Depends(get_workspace_by_id_from_path)) -> RolesInResponse:
access_service = get_access_service()
async def get_workspace_roles(workspace=Depends(get_workspace_by_id_from_path), access_service=Depends(get_access_service)) -> RolesInResponse:
roles = access_service.get_workspace_roles(workspace)
return RolesInResponse(roles=roles)


@workspaces_users_admin_router.post("/workspaces/{workspace_id}/users/assign", status_code=status.HTTP_202_ACCEPTED, name=strings.API_ASSIGN_WORKSPACE_USER)
async def assign_workspace_user(response: Response, user_email: str, role_name: str, workspace=Depends(get_workspace_by_id_from_path)) -> UsersInResponse:
access_service = get_access_service()

user = access_service.get_user_by_email(user_email)
role = access_service.get_workspace_role_by_name(role_name, workspace)
async def assign_workspace_user(response: Response, user_id: str, role_id: str, workspace=Depends(get_workspace_by_id_from_path), access_service=Depends(get_access_service)) -> UsersInResponse:

access_service.assign_workspace_user(
user,
user_id,
workspace,
role
role_id
)

users = access_service.get_workspace_users(workspace)
return UsersInResponse(users=users)


@workspaces_users_admin_router.delete("/workspaces/{workspace_id}/users/assign", status_code=status.HTTP_202_ACCEPTED, name=strings.API_REMOVE_WORKSPACE_USER_ASSIGNMENT)
async def remove_workspace_user_assignment(user_email: str,
role_name: str,
workspace=Depends(get_workspace_by_id_from_path)) -> UsersInResponse:

access_service = get_access_service()

user = access_service.get_user_by_email(user_email)
role = access_service.get_workspace_role_by_name(role_name, workspace)
async def remove_workspace_user_assignment(user_id: str,
role_id: str,
assignmentType: AssignmentType = AssignmentType.APP_ROLE,
workspace=Depends(get_workspace_by_id_from_path),
access_service=Depends(get_access_service)) -> UsersInResponse:

access_service.remove_workspace_role_user_assignment(
user,
role,
workspace
user_id,
role_id,
workspace,
assignmentType
)

users = access_service.get_workspace_users(workspace)
Expand Down
39 changes: 30 additions & 9 deletions api_app/models/domain/authentication.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,17 @@
from collections import namedtuple
from typing import List

from pydantic import BaseModel, Field

from enum import Enum

RoleAssignment = namedtuple("RoleAssignment", "resource_id, role_id")


class User(BaseModel):
id: str
name: str
email: str = Field(None)
roles: List[str] = Field([])
roleAssignments: List[RoleAssignment] = Field([])


class AssignableUser(BaseModel):
name: str
email: str


class Role(BaseModel):
id: str
value: str
Expand All @@ -30,3 +22,32 @@ class Role(BaseModel):
displayName: str
origin: str
roleAssignments: List[RoleAssignment] = Field([])

class AssignableUser(BaseModel):
id: str
displayName: str
userPrincipalName: str

class AssignmentType(Enum):
APP_ROLE = "ApplicationRole"
GROUP = "Group"

class AssignedRole(BaseModel):
id: str
displayName: str
type: AssignmentType

def __eq__(self, other):
return self.id == other.id

def __hash__(self):
return hash(self.id)

class AssignedUser(BaseModel):
id: str
displayName: str
userPrincipalName: str
roles: List[AssignedRole] = Field([])



4 changes: 2 additions & 2 deletions api_app/models/schemas/users.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from pydantic import BaseModel, Field
from typing import List

from models.domain.authentication import User, AssignableUser
from models.domain.authentication import AssignedUser, AssignableUser


class UsersInResponse(BaseModel):
users: List[User] = Field(..., title="Users", description="List of users assigned to the workspace")
users: List[AssignedUser] = Field(..., title="Users", description="List of users assigned to the workspace")

class Config:
schema_extra = {
Expand Down
99 changes: 49 additions & 50 deletions api_app/services/aad_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from services.access_service import AccessService, AuthConfigValidationError, UserRoleAssignmentError
from core import config
from db.errors import EntityDoesNotExist
from models.domain.authentication import User, AssignableUser, Role, RoleAssignment
from models.domain.authentication import AssignedRole, AssignedUser, AssignmentType, User, AssignableUser, Role, RoleAssignment
from models.domain.workspace import Workspace, WorkspaceRole
from resources import strings
from db.repositories.workspaces import WorkspaceRepository
Expand All @@ -21,6 +21,7 @@
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import json


MICROSOFT_GRAPH_URL = config.MICROSOFT_GRAPH_URL.strip("/")
Expand Down Expand Up @@ -263,14 +264,14 @@ def _get_user_details(self, roles_graph_data, msgraph_token):

return users_graph_data

def _get_roles_for_principal(self, user_id, roles_graph_data, app_id_to_role_name):
def _get_roles_for_principal(self, user_id, roles_graph_data, app_id_to_role_name, assignmentType: AssignmentType = AssignmentType.APP_ROLE) -> List[AssignedRole]:
roles = []
for role_assignment in roles_graph_data["value"]:
if role_assignment["principalId"] == user_id:
roles.append(app_id_to_role_name[role_assignment["appRoleId"]])
roles.append(AssignedRole(id=role_assignment["appRoleId"], displayName=app_id_to_role_name[role_assignment["appRoleId"]], type=assignmentType))
return roles

def _get_users_inc_groups_from_response(self, users_graph_data, roles_graph_data, app_id_to_role_name) -> List[User]:
def _get_users_inc_groups_from_response(self, users_graph_data, roles_graph_data, app_id_to_role_name) -> List[AssignedUser]:
users = []
for user_data in users_graph_data["responses"]:
if "users" in user_data["body"]["@odata.context"]:
Expand All @@ -284,7 +285,7 @@ def _get_users_inc_groups_from_response(self, users_graph_data, roles_graph_data
user_roles = self._get_roles_for_principal(user_id, roles_graph_data, app_id_to_role_name)

if not any(user.id == user_id for user in users):
users.append(User(id=user_id, name=user_name, email=user_email, roles=user_roles))
users.append(AssignedUser(id=user_id, displayName=user_name, userPrincipalName=user_email, roles=user_roles))
else:
user = next((user for user in users if user.id == user_id), None)
user.roles = list(set(user.roles + user_roles))
Expand All @@ -297,20 +298,21 @@ def _get_users_inc_groups_from_response(self, users_graph_data, roles_graph_data
user_name = group_member["displayName"]
user_email = group_member["userPrincipalName"]

group_roles = self._get_roles_for_principal(group_id, roles_graph_data, app_id_to_role_name)
group_roles = self._get_roles_for_principal(group_id, roles_graph_data, app_id_to_role_name, AssignmentType.GROUP)

if not any(user.id == user_id for user in users):
users.append(User(id=user_id, name=user_name, email=user_email, roles=group_roles))
users.append(AssignedUser(id=user_id, displayName=user_name, userPrincipalName=user_email, roles=group_roles))
else:
user = next((user for user in users if user.id == user_id), None)
user.roles = list(set(user.roles + group_roles))

return users

def get_workspace_users(self, workspace: Workspace) -> List[User]:
def get_workspace_users(self, workspace: Workspace) -> List[AssignedUser]:
msgraph_token = self._get_msgraph_token()

sp_graph_data = self._get_app_sp_graph_data(workspace.properties["client_id"])
app_id_to_role_name = {app_role["id"]: app_role["value"] for app_role in sp_graph_data["value"][0]["appRoles"]}
app_id_to_role_name = {app_role["id"]: (app_role["displayName"]) for app_role in sp_graph_data["value"][0]["appRoles"]}
roles_graph_data = self._get_user_role_assignments(workspace.properties["sp_id"], msgraph_token)
users_graph_data = self._get_user_details(roles_graph_data, msgraph_token)
users_inc_groups = self._get_users_inc_groups_from_response(users_graph_data, roles_graph_data, app_id_to_role_name)
Expand All @@ -327,7 +329,7 @@ def get_assignable_users(self, filter: str = "", maxResultCount: int = 5) -> Lis

for user_data in graph_data["value"]:
result.append(
AssignableUser(name=user_data["displayName"], email=user_data["userPrincipalName"])
AssignableUser(id=user_data["id"], displayName=user_data["displayName"], userPrincipalName=user_data["userPrincipalName"])
)

return result
Expand Down Expand Up @@ -374,72 +376,67 @@ def get_workspace_role_by_name(self, name: str, workspace: Workspace) -> Role:

return None

def get_user_by_email(self, user_email: str) -> User:
encoded_email = urllib.parse.quote(user_email)
user_by_email_endpoint = f"{MICROSOFT_GRAPH_URL}/v1.0/users/{encoded_email}"
user_data = self._ms_graph_query(user_by_email_endpoint, "GET")

return User(id=user_data["id"][0], name=user_data["displayName"][0], email=user_email, roles=[])

def assign_workspace_user(self, user: User, workspace: Workspace, role: Role) -> None:
def assign_workspace_user(self, user_id: str, workspace: Workspace, role_id: str) -> None:
# User already has the role, do nothing
if self._is_user_in_role(user, role):
if self._is_user_in_role(user_id, role_id):
return
if self._is_workspace_role_group_in_use(workspace):
return self._assign_workspace_user_to_application_group(user, workspace, role)
return self._assign_workspace_user_to_application_group(user_id, workspace, role_id)
else:
return self._assign_workspace_user_to_application(user, workspace, role)
return self._assign_workspace_user_to_application(user_id, workspace, role_id)

def _is_user_in_role(self, user: User, role: Role) -> bool:
return any(r for r in user.roles if r == role.value)
def _is_user_in_role(self, user_id: User, role_id: Role) -> bool:
user_app_role_query = f"{MICROSOFT_GRAPH_URL}/v1.0/users/{user_id}/appRoleAssignments"
user_app_roles = self._ms_graph_query(user_app_role_query, "GET")
return any(r for r in user_app_roles["value"] if r["appRoleId"] == role_id)

def _is_workspace_role_group_in_use(self, workspace: Workspace) -> bool:
aad_groups_in_user = workspace.properties["create_aad_groups"]
return aad_groups_in_user

def _get_workspace_group_name(self, workspace: Workspace, role: Role) -> tuple:
def _get_workspace_group_name(self, workspace: Workspace, role_id: str) -> tuple:
tre_id = workspace.properties["tre_id"]
workspace_id = workspace.properties["workspace_id"]
group_name = ""
app_role_id_suffix = ""
if role.value == "WorkspaceResearcher":
if workspace.properties["app_role_id_workspace_researcher"] == role_id:
group_name = "Workspace Researchers"
app_role_id_suffix = "workspace_researcher"
elif role.value == "WorkspaceOwner":
elif workspace.properties["app_role_id_workspace_owner"] == role_id:
group_name = "Workspace Owners"
app_role_id_suffix = "workspace_owner"
elif role.value == "AirlockManager":
elif workspace.properties["app_role_id_workspace_airlock_manager"] == role_id:
group_name = "Airlock Managers"
app_role_id_suffix = "workspace_airlock_manager"
else:
raise UserRoleAssignmentError(f"Unknown role: {role.value}")
raise UserRoleAssignmentError(f"Unknown role: {role_id}")

return (f"{tre_id}-ws-{workspace_id} {group_name}", f"app_role_id_{app_role_id_suffix}")

def _assign_workspace_user_to_application_group(self, user: User, workspace: Workspace, role: Role):
def _assign_workspace_user_to_application_group(self, user_id: str, workspace: Workspace, role_id: str):
msgraph_token = self._get_msgraph_token()
roles_graph_data = self._get_user_role_assignments(workspace.properties["sp_id"], msgraph_token)
group_details = self._get_workspace_group_name(workspace, role)
group_details = self._get_workspace_group_name(workspace, role_id)
group_name = group_details[0]
workspace_app_role_field = group_details[1]

for group in [item for item in roles_graph_data["value"] if item["principalType"] == PrincipalType.Group.value]:
if group.get("principalDisplayName") == group_name and group.get("appRoleId") == workspace.properties[workspace_app_role_field]:
self._add_user_to_group(user.id, group["principalId"])
self._add_user_to_group(user_id, group["principalId"])
return

raise UserRoleAssignmentError(f"Unable to assign user to group with role: {role.value}")
raise UserRoleAssignmentError(f"Unable to assign user to group with role: {role_id}")

def _remove_workspace_user_from_application_group(self, user: User, workspace: Workspace, role: Role):
def _remove_workspace_user_from_application_group(self, user_id: str, workspace: Workspace, role_id: str):
msgraph_token = self._get_msgraph_token()
roles_graph_data = self._get_user_role_assignments(workspace.properties["sp_id"], msgraph_token)
group_details = self._get_workspace_group_name(workspace, role)
group_details = self._get_workspace_group_name(workspace, role_id)
group_name = group_details[0]
workspace_app_role_field = group_details[1]

for group in [item for item in roles_graph_data["value"] if item["principalType"] == PrincipalType.Group.value]:
if group.get("principalDisplayName") == group_name and group.get("appRoleId") == workspace.properties[workspace_app_role_field]:
self._remove_user_from_group(user.id, group["principalId"])
self._remove_user_from_group(user_id, group["principalId"])
return
raise UserRoleAssignmentError(f"Unable to assign user to group with role: {role.value}")

Expand All @@ -459,15 +456,15 @@ def _remove_user_from_group(self, user_id: str, group_id: str):
response = requests.delete(url, headers=self._get_auth_header(msgraph_token))
return response

def _assign_workspace_user_to_application(self, user: User, workspace: Workspace, role: Role):
def _assign_workspace_user_to_application(self, user_id: str, workspace: Workspace, role_id: str):
msgraph_token = self._get_msgraph_token()
url = f"{MICROSOFT_GRAPH_URL}/v1.0/users/{user.id}/appRoleAssignments"
url = f"{MICROSOFT_GRAPH_URL}/v1.0/users/{user_id}/appRoleAssignments"
application_id = workspace.properties["sp_id"]

body = {
"principalId": user.id,
"principalId": user_id,
"resourceId": application_id,
"appRoleId": role.id,
"appRoleId": role_id,
}

response = requests.post(url, json=body, headers=self._get_auth_header(msgraph_token))
Expand All @@ -479,20 +476,22 @@ def _get_role_assignment_for_user(self, user_id: str, role_id: str) -> dict:
if role["appRoleId"] == role_id:
return role

def remove_workspace_role_user_assignment(self, user: User,
role: Role,
workspace: Workspace) -> None:
# Get the role assignment id for the role we want to remove
role_assignment = self._get_role_assignment_for_user(user.id, role.id)

if self._is_workspace_role_group_in_use(workspace):
self._remove_workspace_user_from_application_group(user, workspace, role)
def remove_workspace_role_user_assignment(self,
user_id: str,
role_id: str,
workspace: Workspace,
assignmentType: AssignmentType = AssignmentType.APP_ROLE
) -> None:
if assignmentType == AssignmentType.APP_ROLE:
self._remove_workspace_user_from_application(user_id, role_id)
else:
self._remove_workspace_user_from_application(user, role_assignment)
self._remove_workspace_user_from_application_group(user_id, workspace, role_id)

def _remove_workspace_user_from_application(self, user_id: str, role_id: str) -> requests.Response:
role_assignment = self._get_role_assignment_for_user(user_id, role_id)

def _remove_workspace_user_from_application(self, user: User, role_assignment: dict) -> requests.Response:
msgraph_token = self._get_msgraph_token()
url = f"{MICROSOFT_GRAPH_URL}/v1.0/users/{user.id}/appRoleAssignments/{role_assignment['id']}"
url = f"{MICROSOFT_GRAPH_URL}/v1.0/users/{user_id}/appRoleAssignments/{role_assignment['id']}"
response = requests.delete(url, headers=self._get_auth_header(msgraph_token))
return response

Expand Down
Loading

0 comments on commit fee832a

Please sign in to comment.