|
1 | 1 | import logging
|
2 |
| -from typing import Union |
| 2 | +from typing import Dict, List, Union |
3 | 3 |
|
4 | 4 | from spaceone.core.error import *
|
5 | 5 | from spaceone.core.service import *
|
|
11 | 11 | from spaceone.identity.manager.project_manager import ProjectManager
|
12 | 12 | from spaceone.identity.manager.resource_manager import ResourceManager
|
13 | 13 | from spaceone.identity.manager.role_binding_manager import RoleBindingManager
|
14 |
| -from spaceone.identity.manager.service_account_manager import \ |
15 |
| - ServiceAccountManager |
16 |
| -from spaceone.identity.manager.trusted_account_manager import \ |
17 |
| - TrustedAccountManager |
18 |
| -from spaceone.identity.manager.workspace_group_manager import \ |
19 |
| - WorkspaceGroupManager |
| 14 | +from spaceone.identity.manager.service_account_manager import ServiceAccountManager |
| 15 | +from spaceone.identity.manager.trusted_account_manager import TrustedAccountManager |
| 16 | +from spaceone.identity.manager.workspace_group_manager import WorkspaceGroupManager |
20 | 17 | from spaceone.identity.manager.workspace_manager import WorkspaceManager
|
21 | 18 | from spaceone.identity.model import Workspace
|
22 | 19 | from spaceone.identity.model.workspace.request import *
|
23 | 20 | from spaceone.identity.model.workspace.response import *
|
| 21 | +from spaceone.identity.service.role_binding_service import RoleBindingService |
24 | 22 |
|
25 | 23 | _LOGGER = logging.getLogger(__name__)
|
26 | 24 |
|
@@ -86,6 +84,45 @@ def update(self, params: WorkspaceUpdateRequest) -> Union[WorkspaceResponse, dic
|
86 | 84 | )
|
87 | 85 | return WorkspaceResponse(**workspace_vo.to_dict())
|
88 | 86 |
|
| 87 | + @transaction(permission="identity:Workspace.write", role_types=["DOMAIN_ADMIN"]) |
| 88 | + @convert_model |
| 89 | + def change_workspace_group( |
| 90 | + self, params: WorkspaceChangeWorkspaceGroupRequest |
| 91 | + ) -> Union[WorkspaceResponse, dict]: |
| 92 | + """Change workspace group |
| 93 | + Args: |
| 94 | + params (WorkspaceChangeWorkspaceGroupRequest): { |
| 95 | + 'workspace_id': 'str', # required |
| 96 | + 'workspace_group_id': 'str', |
| 97 | + 'domain_id': 'str' # injected from auth (required) |
| 98 | + } |
| 99 | + Returns: |
| 100 | + WorkspaceResponse: |
| 101 | + """ |
| 102 | + workspace_id = params.workspace_id |
| 103 | + workspace_group_id = params.workspace_group_id |
| 104 | + domain_id = params.domain_id |
| 105 | + |
| 106 | + workspace_vo = self.workspace_mgr.get_workspace( |
| 107 | + workspace_id=params.workspace_id, domain_id=domain_id |
| 108 | + ) |
| 109 | + |
| 110 | + previous_workspace_group_id = workspace_vo.workspace_group_id |
| 111 | + is_updatable = True |
| 112 | + if workspace_group_id: |
| 113 | + self._add_workspace_to_group(workspace_id, workspace_group_id, domain_id) |
| 114 | + elif previous_workspace_group_id: |
| 115 | + self._remove_workspace_from_group( |
| 116 | + previous_workspace_group_id, workspace_group_id, domain_id |
| 117 | + ) |
| 118 | + |
| 119 | + if is_updatable: |
| 120 | + workspace_vo = self.workspace_mgr.update_workspace_by_vo( |
| 121 | + params.dict(exclude_unset=False), workspace_vo |
| 122 | + ) |
| 123 | + |
| 124 | + return WorkspaceResponse(**workspace_vo.to_dict()) |
| 125 | + |
89 | 126 | @transaction(permission="identity:Workspace.write", role_types=["DOMAIN_ADMIN"])
|
90 | 127 | @convert_model
|
91 | 128 | def delete(self, params: WorkspaceDeleteRequest) -> None:
|
@@ -230,6 +267,7 @@ def list(
|
230 | 267 | 'created_by': 'str',
|
231 | 268 | 'is_managed': 'bool',
|
232 | 269 | 'is_dormant': 'bool',
|
| 270 | + 'workspace_group_id': 'str', |
233 | 271 | 'domain_id': 'str', # injected from auth (required)
|
234 | 272 | }
|
235 | 273 | Returns:
|
@@ -335,3 +373,77 @@ def _delete_related_resources_in_workspace(workspace_vo: Workspace):
|
335 | 373 | _LOGGER.debug(
|
336 | 374 | f"[_delete_related_resources_in_workspace] Delete workspace group: {workspace_group_vo.name} ({workspace_group_vo.workspace_group_id})"
|
337 | 375 | )
|
| 376 | + |
| 377 | + def _add_workspace_to_group( |
| 378 | + self, workspace_id: str, workspace_group_id: str, domain_id: str |
| 379 | + ) -> bool: |
| 380 | + workspace_vo = self.workspace_mgr.get_workspace( |
| 381 | + workspace_id=workspace_id, domain_id=domain_id |
| 382 | + ) |
| 383 | + workspace_group_mgr = WorkspaceGroupManager() |
| 384 | + existing_workspace_group_id = workspace_vo.workspace_group_id |
| 385 | + is_updatable = True |
| 386 | + |
| 387 | + workspace_group_vo = workspace_group_mgr.get_workspace_group( |
| 388 | + workspace_group_id=workspace_group_id, domain_id=domain_id |
| 389 | + ) |
| 390 | + |
| 391 | + if existing_workspace_group_id: |
| 392 | + if existing_workspace_group_id != workspace_group_id: |
| 393 | + self._delete_role_bindings(existing_workspace_group_id, domain_id) |
| 394 | + |
| 395 | + self._create_role_bindings( |
| 396 | + workspace_group_vo.users, |
| 397 | + workspace_id, |
| 398 | + workspace_group_id, |
| 399 | + domain_id, |
| 400 | + ) |
| 401 | + else: |
| 402 | + is_updatable = False |
| 403 | + else: |
| 404 | + self._create_role_bindings( |
| 405 | + workspace_group_vo.users, |
| 406 | + workspace_id, |
| 407 | + workspace_group_id, |
| 408 | + domain_id, |
| 409 | + ) |
| 410 | + |
| 411 | + return is_updatable |
| 412 | + |
| 413 | + def _remove_workspace_from_group( |
| 414 | + self, previous_workspace_group_id: str, workspace_group_id: str, domain_id: str |
| 415 | + ) -> None: |
| 416 | + self._delete_role_bindings(previous_workspace_group_id, domain_id) |
| 417 | + |
| 418 | + @staticmethod |
| 419 | + def _delete_role_bindings(existing_workspace_group_id: str, domain_id: str): |
| 420 | + rb_mgr = RoleBindingManager() |
| 421 | + rb_vos = rb_mgr.filter_role_bindings( |
| 422 | + workspace_group_id=existing_workspace_group_id, |
| 423 | + domain_id=domain_id, |
| 424 | + ) |
| 425 | + for rb_vo in rb_vos: |
| 426 | + _LOGGER.debug( |
| 427 | + f"[_delete_role_bindings] Delete role binding info: {rb_vo.to_dict()}" |
| 428 | + ) |
| 429 | + rb_vo.delete() |
| 430 | + |
| 431 | + @staticmethod |
| 432 | + def _create_role_bindings( |
| 433 | + workspace_group_users: List[Dict[str, str]], |
| 434 | + workspace_id: str, |
| 435 | + workspace_group_id: str, |
| 436 | + domain_id: str, |
| 437 | + ): |
| 438 | + rb_svc = RoleBindingService() |
| 439 | + for user_info in workspace_group_users or []: |
| 440 | + rb_svc.create_role_binding( |
| 441 | + { |
| 442 | + "user_id": user_info["user_id"], |
| 443 | + "role_id": user_info["role_id"], |
| 444 | + "resource_group": "WORKSPACE", |
| 445 | + "domain_id": domain_id, |
| 446 | + "workspace_group_id": workspace_group_id, |
| 447 | + "workspace_id": workspace_id, |
| 448 | + } |
| 449 | + ) |
0 commit comments