-
Notifications
You must be signed in to change notification settings - Fork 175
Expand file tree
/
Copy pathscope.py
More file actions
49 lines (43 loc) · 1.87 KB
/
scope.py
File metadata and controls
49 lines (43 loc) · 1.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from typing import override
from ai.backend.common.contexts.user import current_user
from ai.backend.manager.actions.action import BaseActionTriggerMeta
from ai.backend.manager.actions.action.scope import BaseScopeAction
from ai.backend.manager.data.permission.id import ScopeId
from ai.backend.manager.data.permission.role import ScopePermissionCheckInput
from ai.backend.manager.data.permission.types import EntityType, ScopeType
from ai.backend.manager.errors.rbac import RBACForbidden
from ai.backend.manager.errors.user import UserNotFound
from ai.backend.manager.repositories.permission_controller.repository import (
PermissionControllerRepository,
)
from ...validator.scope import ScopeActionValidator
class ScopeActionRBACValidator(ScopeActionValidator):
def __init__(
self,
repository: PermissionControllerRepository,
) -> None:
self._repository = repository
@override
async def validate(self, action: BaseScopeAction, meta: BaseActionTriggerMeta) -> None:
entity_type = EntityType(action.entity_type())
scope_type = ScopeType(action.scope_type())
scope_id = action.scope_id()
user = current_user()
if user is None:
raise UserNotFound("User not found in context")
is_valid = await self._repository.check_permission_in_scope(
ScopePermissionCheckInput(
user_id=user.user_id,
operation=action.permission_operation_type(),
target_entity_type=entity_type,
target_scope_id=ScopeId(
scope_type=scope_type,
scope_id=scope_id,
),
)
)
if not is_valid:
raise RBACForbidden(
"User does not have permission to perform this action in the specified scope "
f"({scope_type.value}:{scope_id})"
)