diff --git a/changes/11236.feature.md b/changes/11236.feature.md new file mode 100644 index 00000000000..fe06f3c19f1 --- /dev/null +++ b/changes/11236.feature.md @@ -0,0 +1 @@ +Add an effective permissions resolver in the permission controller service and repository layers that returns all permitted operations a user can perform on given entities by traversing the RBAC scope chain. diff --git a/src/ai/backend/manager/data/permission/role.py b/src/ai/backend/manager/data/permission/role.py index 8bfba832612..74f10a0936e 100644 --- a/src/ai/backend/manager/data/permission/role.py +++ b/src/ai/backend/manager/data/permission/role.py @@ -1,6 +1,7 @@ from __future__ import annotations import uuid +from collections.abc import Mapping from dataclasses import dataclass, field from datetime import datetime @@ -94,6 +95,28 @@ class ScopeChainPermissionCheckInput: permission_entity_type: EntityType | None +@dataclass(frozen=True) +class EffectivePermissionsInput: + """Input for resolving effective permissions per entity for a given user. + + Given a user, an element type, and a list of entity IDs, returns the + set of permitted operations per entity by traversing the scope chain + and evaluating all role/permission assignments. + """ + + user_id: uuid.UUID + target_element_type: RBACElementType + target_entity_ids: list[str] + permission_entity_type: EntityType | None = None + + +@dataclass(frozen=True) +class EffectivePermissionsResult: + """Mapping from entity ID to the set of operations the user is authorized to perform.""" + + permissions: Mapping[str, set[OperationType]] + + @dataclass(frozen=True) class BulkPermissionCheckInput: user_id: uuid.UUID diff --git a/src/ai/backend/manager/repositories/permission_controller/db_source/db_source.py b/src/ai/backend/manager/repositories/permission_controller/db_source/db_source.py index 48387d2f0ca..535f88bc790 100644 --- a/src/ai/backend/manager/repositories/permission_controller/db_source/db_source.py +++ b/src/ai/backend/manager/repositories/permission_controller/db_source/db_source.py @@ -1,5 +1,6 @@ import logging import uuid +from collections import defaultdict from collections.abc import Collection, Iterable, Sequence from dataclasses import dataclass, field from typing import Any, cast @@ -40,6 +41,8 @@ BulkRoleRevocationFailure, BulkRoleRevocationResultData, BulkUserRoleRevocationInput, + EffectivePermissionsInput, + EffectivePermissionsResult, ProjectRoleCount, RoleListResult, RolePermissionsUpdateInput, @@ -1088,6 +1091,88 @@ async def check_bulk_permission_with_scope_chain( ) return {eid: eid in granted for eid in data.target_entity_ids} + async def resolve_effective_permissions( + self, + data: EffectivePermissionsInput, + ) -> EffectivePermissionsResult: + """Resolve the effective permissions for a user across multiple entities. + + Uses a single batched query that traverses the scope chain (AUTO edges) + and self-scope permissions to collect all operations the user can perform + on each entity. + + Returns a mapping from entity ID to the set of permitted operations. + """ + if not data.target_entity_ids: + return EffectivePermissionsResult(permissions={}) + + association_entity_type = data.target_element_type.to_entity_type() + permission_entity_type = data.permission_entity_type or association_entity_type + target_scope_type = data.target_element_type.to_scope_type() + + perm = PermissionRow.__table__ + user_roles = UserRoleRow.__table__ + roles = RoleRow.__table__ + + scope_chain_cte = self._build_scope_chain_cte( + association_entity_type, data.target_entity_ids + ) + scope_chain_query = ( + sa.select( + scope_chain_cte.c.entity_id, + perm.c.operation, + ) + .select_from( + scope_chain_cte.join( + perm, + sa.and_( + perm.c.scope_type == scope_chain_cte.c.scope_type, + perm.c.scope_id == scope_chain_cte.c.scope_id, + ), + ) + .join(roles, roles.c.id == perm.c.role_id) + .join(user_roles, user_roles.c.role_id == roles.c.id) + ) + .where( + sa.and_( + user_roles.c.user_id == data.user_id, + roles.c.status == RoleStatus.ACTIVE, + perm.c.entity_type == permission_entity_type, + ) + ) + ) + + self_scope_query = ( + sa.select( + perm.c.scope_id.label("entity_id"), + perm.c.operation, + ) + .select_from( + perm.join(roles, roles.c.id == perm.c.role_id).join( + user_roles, user_roles.c.role_id == roles.c.id + ) + ) + .where( + sa.and_( + user_roles.c.user_id == data.user_id, + roles.c.status == RoleStatus.ACTIVE, + perm.c.scope_type == target_scope_type, + perm.c.scope_id.in_(data.target_entity_ids), + perm.c.entity_type == permission_entity_type, + ) + ) + ) + + combined_query = sa.union_all(scope_chain_query, self_scope_query) + + permissions: defaultdict[str, set[OperationType]] = defaultdict(set) + async with self._db.begin_readonly_session_read_committed() as db_session: + result = await db_session.execute(combined_query) + for row in result: + permissions[row.entity_id].add(OperationType(row.operation)) + + return EffectivePermissionsResult(permissions=permissions) + async def bulk_assign_role( self, bulk_creator: BulkCreator[UserRoleRow] ) -> BulkCreatorResultWithFailures[UserRoleRow]: diff --git a/src/ai/backend/manager/repositories/permission_controller/repository.py b/src/ai/backend/manager/repositories/permission_controller/repository.py index 919dcac0c74..dba1b23a8d0 100644 --- a/src/ai/backend/manager/repositories/permission_controller/repository.py +++ b/src/ai/backend/manager/repositories/permission_controller/repository.py @@ -31,6 +31,8 @@ BulkRoleAssignmentResultData, BulkRoleRevocationResultData, BulkUserRoleRevocationInput, + EffectivePermissionsInput, + EffectivePermissionsResult, RoleData, RoleDetailData, RoleListResult, @@ -363,6 +365,18 @@ async def check_bulk_permission_with_scope_chain( """ return await self._db_source.check_bulk_permission_with_scope_chain(data) + @permission_controller_repository_resilience.apply() + async def resolve_effective_permissions( + self, + data: EffectivePermissionsInput, + ) -> EffectivePermissionsResult: + """Resolve the set of permitted operations per entity for a given user. + + For each target entity, traverses the scope chain (AUTO edges) and + self-scope permissions to collect all operations the user can perform. + """ + return await self._db_source.resolve_effective_permissions(data) + # -- role invitation -- @permission_controller_repository_resilience.apply() diff --git a/src/ai/backend/manager/services/permission_contoller/actions/__init__.py b/src/ai/backend/manager/services/permission_contoller/actions/__init__.py index 437207973cd..a5e78091c0b 100644 --- a/src/ai/backend/manager/services/permission_contoller/actions/__init__.py +++ b/src/ai/backend/manager/services/permission_contoller/actions/__init__.py @@ -7,6 +7,10 @@ from .get_permission_matrix import GetPermissionMatrixAction, GetPermissionMatrixActionResult from .get_role_detail import GetRoleDetailAction, GetRoleDetailActionResult from .purge_role import PurgeRoleAction, PurgeRoleActionResult +from .resolve_effective_permissions import ( + ResolveEffectivePermissionsAction, + ResolveEffectivePermissionsActionResult, +) from .revoke_role import RevokeRoleAction, RevokeRoleActionResult from .search_permissions import ( SearchPermissionsAction, @@ -47,6 +51,8 @@ "GetRoleDetailActionResult", "PurgeRoleAction", "PurgeRoleActionResult", + "ResolveEffectivePermissionsAction", + "ResolveEffectivePermissionsActionResult", "RevokeRoleAction", "RevokeRoleActionResult", "SearchRolesAction", diff --git a/src/ai/backend/manager/services/permission_contoller/actions/resolve_effective_permissions.py b/src/ai/backend/manager/services/permission_contoller/actions/resolve_effective_permissions.py new file mode 100644 index 00000000000..5011a49cea2 --- /dev/null +++ b/src/ai/backend/manager/services/permission_contoller/actions/resolve_effective_permissions.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +from collections.abc import Mapping +from dataclasses import dataclass, field +from typing import override +from uuid import UUID + +from ai.backend.common.data.permission.types import EntityType, OperationType, RBACElementType +from ai.backend.manager.actions.action import BaseAction, BaseActionResult +from ai.backend.manager.actions.types import ActionOperationType + + +@dataclass +class ResolveEffectivePermissionsAction(BaseAction): + """Action to resolve effective permissions per entity for a given user. + + Given a user ID, an element type, and a list of entity IDs, returns the + set of permitted operations per entity by traversing the scope chain and + evaluating all role/permission assignments. + """ + + user_id: UUID + target_element_type: RBACElementType + target_entity_ids: list[str] + permission_entity_type: EntityType | None = None + + @override + def entity_id(self) -> str | None: + return str(self.user_id) + + @override + @classmethod + def entity_type(cls) -> EntityType: + return EntityType.PERMISSION + + @override + @classmethod + def operation_type(cls) -> ActionOperationType: + return ActionOperationType.GET + + +@dataclass +class ResolveEffectivePermissionsActionResult(BaseActionResult): + """Result containing the effective permissions per entity.""" + + permissions: Mapping[str, set[OperationType]] = field(default_factory=dict) + + @override + def entity_id(self) -> str | None: + return None diff --git a/src/ai/backend/manager/services/permission_contoller/service.py b/src/ai/backend/manager/services/permission_contoller/service.py index ee61f9efe00..041d1e540fb 100644 --- a/src/ai/backend/manager/services/permission_contoller/service.py +++ b/src/ai/backend/manager/services/permission_contoller/service.py @@ -17,6 +17,7 @@ RejectRoleInvitationAction, ) from ai.backend.manager.data.permission.role import ( + EffectivePermissionsInput, UserRoleRevocationData, ) from ai.backend.manager.data.role_invitation.types import RoleInvitationData @@ -74,6 +75,10 @@ PurgeRoleAction, PurgeRoleActionResult, ) +from ai.backend.manager.services.permission_contoller.actions.resolve_effective_permissions import ( + ResolveEffectivePermissionsAction, + ResolveEffectivePermissionsActionResult, +) from ai.backend.manager.services.permission_contoller.actions.revoke_role import ( RevokeRoleAction, RevokeRoleActionResult, @@ -360,6 +365,24 @@ async def get_permission_matrix( actions[action_cls.action_name()] = perm return GetPermissionMatrixActionResult(matrix=result) + async def resolve_effective_permissions( + self, action: ResolveEffectivePermissionsAction + ) -> ResolveEffectivePermissionsActionResult: + """Resolve the set of permitted operations per entity for a given user. + + Traverses the scope chain and evaluates all role/permission assignments + to return all operations the user is authorized to perform on each entity. + """ + result = await self._repository.resolve_effective_permissions( + EffectivePermissionsInput( + user_id=action.user_id, + target_element_type=action.target_element_type, + target_entity_ids=action.target_entity_ids, + permission_entity_type=action.permission_entity_type, + ) + ) + return ResolveEffectivePermissionsActionResult(permissions=result.permissions) + async def create_role_invitation_by_email( self, action: CreateRoleInvitationByEmailAction ) -> CreateRoleInvitationResult: diff --git a/tests/unit/manager/repositories/permission_controller/test_resolve_effective_permissions.py b/tests/unit/manager/repositories/permission_controller/test_resolve_effective_permissions.py new file mode 100644 index 00000000000..171af153089 --- /dev/null +++ b/tests/unit/manager/repositories/permission_controller/test_resolve_effective_permissions.py @@ -0,0 +1,888 @@ +""" +Tests for PermissionDBSource.resolve_effective_permissions(). +Covers batched scope chain traversal returning per-entity operation sets. +""" + +from __future__ import annotations + +import uuid +from collections.abc import AsyncGenerator +from dataclasses import dataclass, field + +import pytest + +from ai.backend.common.data.permission.types import ( + RBACElementType, + RelationType, +) +from ai.backend.manager.data.permission.role import EffectivePermissionsInput +from ai.backend.manager.data.permission.status import RoleStatus +from ai.backend.manager.data.permission.types import ( + EntityType, + OperationType, + ScopeType, +) +from ai.backend.manager.data.user.types import UserStatus +from ai.backend.manager.models.domain import DomainRow +from ai.backend.manager.models.keypair import KeyPairRow +from ai.backend.manager.models.rbac_models import UserRoleRow +from ai.backend.manager.models.rbac_models.association_scopes_entities import ( + AssociationScopesEntitiesRow, +) +from ai.backend.manager.models.rbac_models.permission.object_permission import ObjectPermissionRow +from ai.backend.manager.models.rbac_models.permission.permission import PermissionRow +from ai.backend.manager.models.rbac_models.role import RoleRow +from ai.backend.manager.models.resource_policy import ( + KeyPairResourcePolicyRow, + UserResourcePolicyRow, +) +from ai.backend.manager.models.user import UserRow +from ai.backend.manager.models.utils import ExtendedAsyncSAEngine +from ai.backend.manager.repositories.permission_controller.db_source.db_source import ( + PermissionDBSource, +) +from ai.backend.testutils.db import with_tables + + +@dataclass +class PermissionEntry: + """A single permission to create in permission_setup fixture.""" + + scope_key: str + operation: OperationType + entity_type: EntityType = EntityType.VFOLDER + + +@dataclass +class EffectiveFixture: + """Pre-built fixture data for effective permissions tests.""" + + user_id: uuid.UUID + role_id: uuid.UUID + domain_id: str = field(default_factory=lambda: str(uuid.uuid4())) + project_id: str = field(default_factory=lambda: str(uuid.uuid4())) + vfolder_ids: list[str] = field(default_factory=list) + # Extra IDs for multi-project / multi-domain tests + domain_b_id: str = field(default_factory=lambda: str(uuid.uuid4())) + project_b_id: str = field(default_factory=lambda: str(uuid.uuid4())) + project_c_id: str = field(default_factory=lambda: str(uuid.uuid4())) + # Extra role for multi-role tests + role_b_id: uuid.UUID = field(default_factory=uuid.uuid4) + + def __post_init__(self) -> None: + if not self.vfolder_ids: + self.vfolder_ids = [str(uuid.uuid4()) for _ in range(3)] + + +class TestResolveEffectivePermissions: + """Tests for batched effective permissions resolution.""" + + @pytest.fixture + async def db_with_rbac_tables( + self, + database_connection: ExtendedAsyncSAEngine, + ) -> AsyncGenerator[ExtendedAsyncSAEngine, None]: + async with with_tables( + database_connection, + [ + DomainRow, + UserResourcePolicyRow, + KeyPairResourcePolicyRow, + RoleRow, + UserRoleRow, + UserRow, + KeyPairRow, + PermissionRow, + ObjectPermissionRow, + AssociationScopesEntitiesRow, + ], + ): + yield database_connection + + @pytest.fixture + def db_source( + self, + db_with_rbac_tables: ExtendedAsyncSAEngine, + ) -> PermissionDBSource: + return PermissionDBSource(db_with_rbac_tables) + + @pytest.fixture + def fixture_ids(self) -> EffectiveFixture: + return EffectiveFixture( + user_id=uuid.uuid4(), + role_id=uuid.uuid4(), + ) + + # ── User + role fixtures ── + + @pytest.fixture + async def user_with_active_role( + self, + db_with_rbac_tables: ExtendedAsyncSAEngine, + fixture_ids: EffectiveFixture, + ) -> EffectiveFixture: + """Create a user with an active role (no permissions yet).""" + async with db_with_rbac_tables.begin_session() as db_sess: + policy = UserResourcePolicyRow( + name="test-rbac-policy", + max_vfolder_count=0, + max_quota_scope_size=-1, + max_session_count_per_model_session=0, + max_customized_image_count=0, + ) + db_sess.add(policy) + user = UserRow( + uuid=fixture_ids.user_id, + email="testuser@test.com", + resource_policy="test-rbac-policy", + status=UserStatus.ACTIVE, + need_password_change=False, + sudo_session_enabled=False, + ) + db_sess.add(user) + await db_sess.flush() + + role = RoleRow( + id=fixture_ids.role_id, + name="test-role", + description="Test role for effective permissions", + ) + db_sess.add(role) + await db_sess.flush() + + user_role = UserRoleRow( + user_id=fixture_ids.user_id, + role_id=fixture_ids.role_id, + ) + db_sess.add(user_role) + await db_sess.flush() + + return fixture_ids + + @pytest.fixture + async def user_with_inactive_role( + self, + db_with_rbac_tables: ExtendedAsyncSAEngine, + fixture_ids: EffectiveFixture, + ) -> EffectiveFixture: + """Create a user with an inactive role.""" + async with db_with_rbac_tables.begin_session() as db_sess: + policy = UserResourcePolicyRow( + name="test-rbac-policy", + max_vfolder_count=0, + max_quota_scope_size=-1, + max_session_count_per_model_session=0, + max_customized_image_count=0, + ) + db_sess.add(policy) + user = UserRow( + uuid=fixture_ids.user_id, + email="testuser@test.com", + resource_policy="test-rbac-policy", + status=UserStatus.ACTIVE, + need_password_change=False, + sudo_session_enabled=False, + ) + db_sess.add(user) + await db_sess.flush() + + role = RoleRow( + id=fixture_ids.role_id, + name="inactive-role", + status=RoleStatus.INACTIVE, + ) + db_sess.add(role) + await db_sess.flush() + + user_role = UserRoleRow( + user_id=fixture_ids.user_id, + role_id=fixture_ids.role_id, + ) + db_sess.add(user_role) + await db_sess.flush() + + return fixture_ids + + @pytest.fixture + async def user_with_two_roles( + self, + db_with_rbac_tables: ExtendedAsyncSAEngine, + fixture_ids: EffectiveFixture, + ) -> EffectiveFixture: + """Create a user with two active roles (no permissions yet).""" + async with db_with_rbac_tables.begin_session() as db_sess: + policy = UserResourcePolicyRow( + name="test-rbac-policy", + max_vfolder_count=0, + max_quota_scope_size=-1, + max_session_count_per_model_session=0, + max_customized_image_count=0, + ) + db_sess.add(policy) + user = UserRow( + uuid=fixture_ids.user_id, + email="testuser@test.com", + resource_policy="test-rbac-policy", + status=UserStatus.ACTIVE, + need_password_change=False, + sudo_session_enabled=False, + ) + db_sess.add(user) + await db_sess.flush() + + role_a = RoleRow( + id=fixture_ids.role_id, + name="role-a", + ) + role_b = RoleRow( + id=fixture_ids.role_b_id, + name="role-b", + ) + db_sess.add(role_a) + db_sess.add(role_b) + await db_sess.flush() + + db_sess.add(UserRoleRow(user_id=fixture_ids.user_id, role_id=fixture_ids.role_id)) + db_sess.add(UserRoleRow(user_id=fixture_ids.user_id, role_id=fixture_ids.role_b_id)) + await db_sess.flush() + + return fixture_ids + + # ── Association fixtures ── + + @pytest.fixture + async def all_vfolders_in_project_auto( + self, + db_with_rbac_tables: ExtendedAsyncSAEngine, + fixture_ids: EffectiveFixture, + ) -> None: + """All vfolders belong to the same PROJECT (auto edge).""" + async with db_with_rbac_tables.begin_session() as db_sess: + for vfolder_id in fixture_ids.vfolder_ids: + db_sess.add( + AssociationScopesEntitiesRow( + scope_type=ScopeType.PROJECT, + scope_id=fixture_ids.project_id, + entity_type=EntityType.VFOLDER, + entity_id=vfolder_id, + relation_type=RelationType.AUTO, + ) + ) + await db_sess.flush() + + @pytest.fixture + async def project_in_domain_auto( + self, + db_with_rbac_tables: ExtendedAsyncSAEngine, + fixture_ids: EffectiveFixture, + ) -> None: + """PROJECT belongs to DOMAIN (auto edge).""" + async with db_with_rbac_tables.begin_session() as db_sess: + db_sess.add( + AssociationScopesEntitiesRow( + scope_type=ScopeType.DOMAIN, + scope_id=fixture_ids.domain_id, + entity_type=EntityType.PROJECT, + entity_id=fixture_ids.project_id, + relation_type=RelationType.AUTO, + ) + ) + await db_sess.flush() + + @pytest.fixture + async def mixed_vfolder_edges( + self, + db_with_rbac_tables: ExtendedAsyncSAEngine, + fixture_ids: EffectiveFixture, + ) -> None: + """vfolder[0] AUTO, vfolder[1] REF, vfolder[2] no association.""" + async with db_with_rbac_tables.begin_session() as db_sess: + db_sess.add( + AssociationScopesEntitiesRow( + scope_type=ScopeType.PROJECT, + scope_id=fixture_ids.project_id, + entity_type=EntityType.VFOLDER, + entity_id=fixture_ids.vfolder_ids[0], + relation_type=RelationType.AUTO, + ) + ) + db_sess.add( + AssociationScopesEntitiesRow( + scope_type=ScopeType.PROJECT, + scope_id=fixture_ids.project_id, + entity_type=EntityType.VFOLDER, + entity_id=fixture_ids.vfolder_ids[1], + relation_type=RelationType.REF, + ) + ) + await db_sess.flush() + + @pytest.fixture + async def multi_project_two_domains( + self, + db_with_rbac_tables: ExtendedAsyncSAEngine, + fixture_ids: EffectiveFixture, + ) -> None: + """Two domains with separate project chains. + + domain_a ← project_a ← vfolder[0] + domain_a ← project_b ← vfolder[1] + domain_b ← project_c ← vfolder[2] + """ + f = fixture_ids + async with db_with_rbac_tables.begin_session() as db_sess: + # domain_a ← project_a ← vfolder[0] + db_sess.add( + AssociationScopesEntitiesRow( + scope_type=ScopeType.DOMAIN, + scope_id=f.domain_id, + entity_type=EntityType.PROJECT, + entity_id=f.project_id, + relation_type=RelationType.AUTO, + ) + ) + db_sess.add( + AssociationScopesEntitiesRow( + scope_type=ScopeType.PROJECT, + scope_id=f.project_id, + entity_type=EntityType.VFOLDER, + entity_id=f.vfolder_ids[0], + relation_type=RelationType.AUTO, + ) + ) + # domain_a ← project_b ← vfolder[1] + db_sess.add( + AssociationScopesEntitiesRow( + scope_type=ScopeType.DOMAIN, + scope_id=f.domain_id, + entity_type=EntityType.PROJECT, + entity_id=f.project_b_id, + relation_type=RelationType.AUTO, + ) + ) + db_sess.add( + AssociationScopesEntitiesRow( + scope_type=ScopeType.PROJECT, + scope_id=f.project_b_id, + entity_type=EntityType.VFOLDER, + entity_id=f.vfolder_ids[1], + relation_type=RelationType.AUTO, + ) + ) + # domain_b ← project_c ← vfolder[2] + db_sess.add( + AssociationScopesEntitiesRow( + scope_type=ScopeType.DOMAIN, + scope_id=f.domain_b_id, + entity_type=EntityType.PROJECT, + entity_id=f.project_c_id, + relation_type=RelationType.AUTO, + ) + ) + db_sess.add( + AssociationScopesEntitiesRow( + scope_type=ScopeType.PROJECT, + scope_id=f.project_c_id, + entity_type=EntityType.VFOLDER, + entity_id=f.vfolder_ids[2], + relation_type=RelationType.AUTO, + ) + ) + await db_sess.flush() + + # ── Permission fixture ── + + @pytest.fixture + async def permission_setup( + self, + db_with_rbac_tables: ExtendedAsyncSAEngine, + fixture_ids: EffectiveFixture, + request: pytest.FixtureRequest, + ) -> None: + scope_map: dict[str, tuple[ScopeType, str]] = { + "vfolder_0": (ScopeType.VFOLDER, fixture_ids.vfolder_ids[0]), + "vfolder_1": (ScopeType.VFOLDER, fixture_ids.vfolder_ids[1]), + "project": (ScopeType.PROJECT, fixture_ids.project_id), + "domain": (ScopeType.DOMAIN, fixture_ids.domain_id), + } + for entry in request.param: + scope_type, scope_id = scope_map[entry.scope_key] + async with db_with_rbac_tables.begin_session() as db_sess: + db_sess.add( + PermissionRow( + role_id=fixture_ids.role_id, + scope_type=scope_type, + scope_id=scope_id, + entity_type=entry.entity_type, + operation=entry.operation, + ) + ) + await db_sess.flush() + + # ── Helpers ── + + def _make_input( + self, + fixture: EffectiveFixture, + entity_type: EntityType = EntityType.VFOLDER, + ) -> EffectivePermissionsInput: + return EffectivePermissionsInput( + user_id=fixture.user_id, + target_element_type=RBACElementType.VFOLDER, + target_entity_ids=fixture.vfolder_ids, + permission_entity_type=entity_type, + ) + + # ── Tests: empty / no permission ── + + async def test_empty_input_returns_empty( + self, + db_source: PermissionDBSource, + user_with_active_role: EffectiveFixture, + ) -> None: + """Empty target_entity_ids returns empty dict.""" + fixture = user_with_active_role + result = await db_source.resolve_effective_permissions( + EffectivePermissionsInput( + user_id=fixture.user_id, + target_element_type=RBACElementType.VFOLDER, + target_entity_ids=[], + ) + ) + assert result.permissions == {} + + async def test_no_permission_returns_empty_sets( + self, + db_source: PermissionDBSource, + user_with_active_role: EffectiveFixture, + all_vfolders_in_project_auto: None, + ) -> None: + """No permissions assigned -> all entities get empty operation sets.""" + fixture = user_with_active_role + result = await db_source.resolve_effective_permissions( + self._make_input(fixture), + ) + for vfolder_id in fixture.vfolder_ids: + assert result.permissions[vfolder_id] == set() + + # ── Tests: scope chain ── + + @pytest.mark.parametrize( + "permission_setup", + [ + pytest.param( + [PermissionEntry("project", OperationType.READ)], + id="project-read", + ) + ], + indirect=["permission_setup"], + ) + async def test_single_operation_via_project_scope( + self, + db_source: PermissionDBSource, + user_with_active_role: EffectiveFixture, + all_vfolders_in_project_auto: None, + permission_setup: None, + ) -> None: + """All vfolders in same project with READ -> all get {READ}.""" + fixture = user_with_active_role + result = await db_source.resolve_effective_permissions( + self._make_input(fixture), + ) + assert len(result.permissions) == 3 + for vfolder_id in fixture.vfolder_ids: + assert result.permissions[vfolder_id] == {OperationType.READ} + + @pytest.mark.parametrize( + "permission_setup", + [ + pytest.param( + [ + PermissionEntry("project", OperationType.READ), + PermissionEntry("project", OperationType.UPDATE), + PermissionEntry("project", OperationType.SOFT_DELETE), + ], + id="project-multiple-ops", + ) + ], + indirect=["permission_setup"], + ) + async def test_multiple_operations_via_project_scope( + self, + db_source: PermissionDBSource, + user_with_active_role: EffectiveFixture, + all_vfolders_in_project_auto: None, + permission_setup: None, + ) -> None: + """Multiple operations at project scope propagate to all vfolders.""" + fixture = user_with_active_role + result = await db_source.resolve_effective_permissions( + self._make_input(fixture), + ) + expected = {OperationType.READ, OperationType.UPDATE, OperationType.SOFT_DELETE} + for vfolder_id in fixture.vfolder_ids: + assert result.permissions[vfolder_id] == expected + + @pytest.mark.parametrize( + "permission_setup", + [ + pytest.param( + [PermissionEntry("domain", OperationType.UPDATE)], + id="domain-update", + ) + ], + indirect=["permission_setup"], + ) + async def test_operations_via_domain_chain( + self, + db_source: PermissionDBSource, + user_with_active_role: EffectiveFixture, + all_vfolders_in_project_auto: None, + project_in_domain_auto: None, + permission_setup: None, + ) -> None: + """Permission at DOMAIN scope propagates through chain to all vfolders.""" + fixture = user_with_active_role + result = await db_source.resolve_effective_permissions( + self._make_input(fixture), + ) + for vfolder_id in fixture.vfolder_ids: + assert result.permissions[vfolder_id] == {OperationType.UPDATE} + + # ── Tests: self-scope ── + + @pytest.mark.parametrize( + "permission_setup", + [ + pytest.param( + [PermissionEntry("vfolder_0", OperationType.READ)], + id="self-scope-v0", + ) + ], + indirect=["permission_setup"], + ) + async def test_self_scope_grants_individually( + self, + db_source: PermissionDBSource, + user_with_active_role: EffectiveFixture, + permission_setup: None, + ) -> None: + """Self-scope permission on vfolder[0] only; others empty.""" + fixture = user_with_active_role + result = await db_source.resolve_effective_permissions( + self._make_input(fixture), + ) + assert result.permissions[fixture.vfolder_ids[0]] == {OperationType.READ} + assert result.permissions[fixture.vfolder_ids[1]] == set() + assert result.permissions[fixture.vfolder_ids[2]] == set() + + # ── Tests: mixed edges ── + + @pytest.mark.parametrize( + "permission_setup", + [ + pytest.param( + [PermissionEntry("project", OperationType.READ)], + id="project-read", + ) + ], + indirect=["permission_setup"], + ) + async def test_mixed_edges_only_auto_gets_operations( + self, + db_source: PermissionDBSource, + user_with_active_role: EffectiveFixture, + mixed_vfolder_edges: None, + permission_setup: None, + ) -> None: + """AUTO edge -> {READ}, REF edge -> empty, no edge -> empty.""" + fixture = user_with_active_role + result = await db_source.resolve_effective_permissions( + self._make_input(fixture), + ) + assert result.permissions[fixture.vfolder_ids[0]] == {OperationType.READ} + assert result.permissions[fixture.vfolder_ids[1]] == set() + assert result.permissions[fixture.vfolder_ids[2]] == set() + + # ── Tests: chain + self-scope combined ── + + @pytest.mark.parametrize( + "permission_setup", + [ + pytest.param( + [ + PermissionEntry("project", OperationType.READ), + PermissionEntry("vfolder_1", OperationType.UPDATE), + ], + id="chain-read-self-update", + ) + ], + indirect=["permission_setup"], + ) + async def test_chain_and_self_scope_union( + self, + db_source: PermissionDBSource, + user_with_active_role: EffectiveFixture, + mixed_vfolder_edges: None, + permission_setup: None, + ) -> None: + """vfolder[0] gets READ via chain, vfolder[1] gets UPDATE via self-scope.""" + fixture = user_with_active_role + result = await db_source.resolve_effective_permissions( + self._make_input(fixture), + ) + assert result.permissions[fixture.vfolder_ids[0]] == {OperationType.READ} + assert result.permissions[fixture.vfolder_ids[1]] == {OperationType.UPDATE} + assert result.permissions[fixture.vfolder_ids[2]] == set() + + @pytest.mark.parametrize( + "permission_setup", + [ + pytest.param( + [ + PermissionEntry("project", OperationType.READ), + PermissionEntry("vfolder_0", OperationType.UPDATE), + ], + id="chain-plus-self-on-same-entity", + ) + ], + indirect=["permission_setup"], + ) + async def test_chain_and_self_scope_merge_on_same_entity( + self, + db_source: PermissionDBSource, + user_with_active_role: EffectiveFixture, + all_vfolders_in_project_auto: None, + permission_setup: None, + ) -> None: + """vfolder[0] gets {READ, UPDATE}: READ from chain, UPDATE from self-scope.""" + fixture = user_with_active_role + result = await db_source.resolve_effective_permissions( + self._make_input(fixture), + ) + assert result.permissions[fixture.vfolder_ids[0]] == { + OperationType.READ, + OperationType.UPDATE, + } + assert result.permissions[fixture.vfolder_ids[1]] == {OperationType.READ} + assert result.permissions[fixture.vfolder_ids[2]] == {OperationType.READ} + + # ── Tests: multi-domain ── + + @pytest.mark.parametrize( + "permission_setup", + [ + pytest.param( + [PermissionEntry("domain", OperationType.READ)], + id="domain-read", + ) + ], + indirect=["permission_setup"], + ) + async def test_domain_chain_multi_project_with_foreign_vfolder( + self, + db_source: PermissionDBSource, + user_with_active_role: EffectiveFixture, + multi_project_two_domains: None, + permission_setup: None, + ) -> None: + """Domain permission grants vfolders in child projects but not in another domain.""" + fixture = user_with_active_role + result = await db_source.resolve_effective_permissions( + self._make_input(fixture), + ) + assert result.permissions[fixture.vfolder_ids[0]] == {OperationType.READ} + assert result.permissions[fixture.vfolder_ids[1]] == {OperationType.READ} + assert result.permissions[fixture.vfolder_ids[2]] == set() + + # ── Tests: inactive role ── + + @pytest.mark.parametrize( + "permission_setup", + [ + pytest.param( + [PermissionEntry("project", OperationType.READ)], + id="project-read", + ) + ], + indirect=["permission_setup"], + ) + async def test_inactive_role_returns_empty_sets( + self, + db_source: PermissionDBSource, + user_with_inactive_role: EffectiveFixture, + all_vfolders_in_project_auto: None, + permission_setup: None, + ) -> None: + """Inactive role does not grant any operations.""" + fixture = user_with_inactive_role + result = await db_source.resolve_effective_permissions( + self._make_input(fixture), + ) + for vfolder_id in fixture.vfolder_ids: + assert result.permissions[vfolder_id] == set() + + # ── Tests: user isolation ── + + async def test_other_user_permissions_not_leaked( + self, + db_source: PermissionDBSource, + user_with_active_role: EffectiveFixture, + db_with_rbac_tables: ExtendedAsyncSAEngine, + all_vfolders_in_project_auto: None, + ) -> None: + """Permissions for another user do not leak into target user's results.""" + fixture = user_with_active_role + other_user_id = uuid.uuid4() + other_role_id = uuid.uuid4() + async with db_with_rbac_tables.begin_session() as db_sess: + db_sess.add( + UserRow( + uuid=other_user_id, + email="other@test.com", + resource_policy="test-rbac-policy", + status=UserStatus.ACTIVE, + need_password_change=False, + sudo_session_enabled=False, + ) + ) + await db_sess.flush() + db_sess.add(RoleRow(id=other_role_id, name="other-role")) + await db_sess.flush() + db_sess.add(UserRoleRow(user_id=other_user_id, role_id=other_role_id)) + db_sess.add( + PermissionRow( + role_id=other_role_id, + scope_type=ScopeType.PROJECT, + scope_id=fixture.project_id, + entity_type=EntityType.VFOLDER, + operation=OperationType.READ, + ) + ) + await db_sess.flush() + + result = await db_source.resolve_effective_permissions( + self._make_input(fixture), + ) + for vfolder_id in fixture.vfolder_ids: + assert result.permissions[vfolder_id] == set() + + # ── Tests: multi-role union ── + + async def test_multiple_roles_union_operations( + self, + db_source: PermissionDBSource, + user_with_two_roles: EffectiveFixture, + db_with_rbac_tables: ExtendedAsyncSAEngine, + all_vfolders_in_project_auto: None, + ) -> None: + """Operations from multiple roles are unioned together.""" + fixture = user_with_two_roles + async with db_with_rbac_tables.begin_session() as db_sess: + # role_a grants READ at project scope + db_sess.add( + PermissionRow( + role_id=fixture.role_id, + scope_type=ScopeType.PROJECT, + scope_id=fixture.project_id, + entity_type=EntityType.VFOLDER, + operation=OperationType.READ, + ) + ) + # role_b grants UPDATE at project scope + db_sess.add( + PermissionRow( + role_id=fixture.role_b_id, + scope_type=ScopeType.PROJECT, + scope_id=fixture.project_id, + entity_type=EntityType.VFOLDER, + operation=OperationType.UPDATE, + ) + ) + await db_sess.flush() + + result = await db_source.resolve_effective_permissions( + self._make_input(fixture), + ) + expected = {OperationType.READ, OperationType.UPDATE} + for vfolder_id in fixture.vfolder_ids: + assert result.permissions[vfolder_id] == expected + + # ── Tests: cycle detection ── + + @pytest.fixture + async def scope_chain_with_cycle( + self, + db_with_rbac_tables: ExtendedAsyncSAEngine, + fixture_ids: EffectiveFixture, + ) -> None: + """Create a cycle: vfolders -> project -> domain -> project (back-edge).""" + async with db_with_rbac_tables.begin_session() as db_sess: + for vfolder_id in fixture_ids.vfolder_ids: + db_sess.add( + AssociationScopesEntitiesRow( + scope_type=ScopeType.PROJECT, + scope_id=fixture_ids.project_id, + entity_type=EntityType.VFOLDER, + entity_id=vfolder_id, + relation_type=RelationType.AUTO, + ) + ) + db_sess.add( + AssociationScopesEntitiesRow( + scope_type=ScopeType.DOMAIN, + scope_id=fixture_ids.domain_id, + entity_type=EntityType.PROJECT, + entity_id=fixture_ids.project_id, + relation_type=RelationType.AUTO, + ) + ) + db_sess.add( + AssociationScopesEntitiesRow( + scope_type=ScopeType.PROJECT, + scope_id=fixture_ids.project_id, + entity_type=EntityType.DOMAIN, + entity_id=fixture_ids.domain_id, + relation_type=RelationType.AUTO, + ) + ) + await db_sess.flush() + + @pytest.mark.parametrize( + "permission_setup", + [ + pytest.param( + [PermissionEntry("domain", OperationType.READ)], + id="domain-read", + ) + ], + indirect=["permission_setup"], + ) + async def test_scope_chain_cycle_terminates_with_permission( + self, + db_source: PermissionDBSource, + user_with_active_role: EffectiveFixture, + scope_chain_with_cycle: None, + permission_setup: None, + ) -> None: + """Cyclic scope chain terminates without infinite recursion; permission is found.""" + fixture = user_with_active_role + result = await db_source.resolve_effective_permissions( + self._make_input(fixture), + ) + for vfolder_id in fixture.vfolder_ids: + assert OperationType.READ in result.permissions[vfolder_id] + + async def test_scope_chain_cycle_terminates_without_permission( + self, + db_source: PermissionDBSource, + user_with_active_role: EffectiveFixture, + scope_chain_with_cycle: None, + ) -> None: + """Cyclic scope chain terminates without infinite recursion; no operations granted.""" + fixture = user_with_active_role + result = await db_source.resolve_effective_permissions( + self._make_input(fixture), + ) + for vfolder_id in fixture.vfolder_ids: + assert result.permissions[vfolder_id] == set()