|
4 | 4 | import string
|
5 | 5 | from typing import Dict, List, Union
|
6 | 6 |
|
| 7 | +from mongoengine import QuerySet |
7 | 8 | from spaceone.core import config
|
8 | 9 | from spaceone.core.service import *
|
9 | 10 | from spaceone.core.service.utils import *
|
|
22 | 23 | from spaceone.identity.manager.user_manager import UserManager
|
23 | 24 | from spaceone.identity.manager.workspace_group_manager import WorkspaceGroupManager
|
24 | 25 | from spaceone.identity.manager.workspace_manager import WorkspaceManager
|
| 26 | +from spaceone.identity.model import WorkspaceGroup |
25 | 27 | from spaceone.identity.model.user.database import User
|
26 | 28 | from spaceone.identity.model.user.response import *
|
27 | 29 | from spaceone.identity.model.user_profile.request import *
|
@@ -416,75 +418,28 @@ def get_workspace_groups(
|
416 | 418 | ) -> Union[MyWorkspaceGroupsResponse, dict]:
|
417 | 419 | """Find user
|
418 | 420 | Args:
|
419 |
| - params (UserWorkspacesRequest): { |
420 |
| - 'user_id': 'str', # injected from auth (required) |
421 |
| - 'domain_id': 'str' # injected from auth (required) |
| 421 | + params (UserProfileGetWorkspaceGroupsRequest): { |
| 422 | + 'user_id': 'str', # injected from auth (required) |
| 423 | + 'domain_id': 'str' # injected from auth (required) |
422 | 424 | }
|
423 | 425 | Returns:
|
424 |
| - MyWorkspaceResponse: |
| 426 | + MyWorkspaceGroupsResponse: |
425 | 427 | """
|
426 |
| - rb_mgr = RoleBindingManager() |
427 |
| - allow_all = False |
428 |
| - |
429 | 428 | user_vo = self.user_mgr.get_user(params.user_id, params.domain_id)
|
| 429 | + allow_all = user_vo.role_type == "DOMAIN_ADMIN" |
430 | 430 |
|
431 |
| - if user_vo.role_type == "DOMAIN_ADMIN": |
432 |
| - allow_all = True |
433 |
| - |
434 |
| - if allow_all: |
435 |
| - workspace_group_vos = self.workspace_group_mgr.filter_workspace_groups( |
436 |
| - domain_id=params.domain_id |
437 |
| - ) |
438 |
| - workspace_group_infos = [ |
439 |
| - workspace_group_vo.to_dict() |
440 |
| - for workspace_group_vo in workspace_group_vos |
441 |
| - ] |
442 |
| - else: |
443 |
| - query_filter = { |
444 |
| - "filter": [ |
445 |
| - {"key": "users.user_id", "value": params.user_id, "operator": "eq"}, |
446 |
| - {"key": "domain_id", "value": params.domain_id, "operator": "eq"}, |
447 |
| - ] |
448 |
| - } |
449 |
| - workspace_group_infos, _ = self.workspace_group_mgr.list_workspace_groups( |
450 |
| - query_filter |
451 |
| - ) |
452 |
| - |
| 431 | + workspace_group_infos = self._get_workspace_group_infos(params, allow_all) |
453 | 432 | workspace_group_ids = [
|
454 |
| - workspace_group_info["workspace_group_id"] |
455 |
| - for workspace_group_info in workspace_group_infos |
| 433 | + info["workspace_group_id"] for info in workspace_group_infos |
456 | 434 | ]
|
457 |
| - |
458 |
| - rb_vos = rb_mgr.filter_role_bindings( |
459 |
| - user_id=params.user_id, |
460 |
| - domain_id=params.domain_id, |
461 |
| - workspace_group_id=workspace_group_ids, |
462 |
| - role_type=["WORKSPACE_OWNER", "WORKSPACE_MEMBER"], |
| 435 | + role_bindings_info_map = self._get_role_bindings_info( |
| 436 | + params, workspace_group_ids |
463 | 437 | )
|
464 |
| - role_bindings_info_map = {rb.workspace_group_id: rb.to_dict() for rb in rb_vos} |
465 |
| - |
466 |
| - workspace_group_user_ids = [] |
467 |
| - for workspace_group_info in workspace_group_infos: |
468 |
| - if not isinstance(workspace_group_info, dict): |
469 |
| - workspace_group_info = workspace_group_info.to_dict() |
470 |
| - if users := workspace_group_info.get("users", []) or []: |
471 |
| - for user in users: |
472 |
| - if isinstance(user, dict): |
473 |
| - workspace_group_user_ids.append(user.get("user_id")) |
474 |
| - elif hasattr(user, "user_id"): |
475 |
| - workspace_group_user_ids.append(user.user_id) |
476 |
| - |
477 |
| - workspace_groups_info = [] |
478 |
| - for workspace_group_info in workspace_group_infos: |
479 |
| - workspace_group_dict = ( |
480 |
| - self.workspace_group_svc.add_user_name_and_state_to_users( |
481 |
| - workspace_group_user_ids, |
482 |
| - workspace_group_info, |
483 |
| - params.domain_id, |
484 |
| - ) |
485 |
| - ) |
486 |
| - workspace_groups_info.append(workspace_group_dict) |
| 438 | + workspace_group_user_ids = self._extract_user_ids(workspace_group_infos) |
487 | 439 |
|
| 440 | + workspace_groups_info = self._add_user_name_and_state( |
| 441 | + workspace_group_infos, params.domain_id, workspace_group_user_ids |
| 442 | + ) |
488 | 443 | my_workspace_groups_info = self._get_my_workspace_groups_info(
|
489 | 444 | workspace_groups_info, role_bindings_info_map
|
490 | 445 | )
|
@@ -573,9 +528,72 @@ def _get_my_workspaces_info(
|
573 | 528 | my_workspaces_info.append(workspace_info)
|
574 | 529 | return my_workspaces_info
|
575 | 530 |
|
| 531 | + def _get_workspace_group_infos( |
| 532 | + self, params: UserProfileGetWorkspaceGroupsRequest, allow_all: bool |
| 533 | + ) -> Union[QuerySet, List[Dict[str, str]]]: |
| 534 | + if allow_all: |
| 535 | + workspace_group_vos = self.workspace_group_mgr.filter_workspace_groups( |
| 536 | + domain_id=params.domain_id |
| 537 | + ) |
| 538 | + return [vo.to_dict() for vo in workspace_group_vos] |
| 539 | + else: |
| 540 | + query_filter = { |
| 541 | + "filter": [ |
| 542 | + {"key": "users.user_id", "value": params.user_id, "operator": "eq"}, |
| 543 | + {"key": "domain_id", "value": params.domain_id, "operator": "eq"}, |
| 544 | + ] |
| 545 | + } |
| 546 | + return self.workspace_group_mgr.list_workspace_groups(query_filter)[0] |
| 547 | + |
| 548 | + @staticmethod |
| 549 | + def _get_role_bindings_info( |
| 550 | + params: UserProfileGetWorkspaceGroupsRequest, workspace_group_ids: List[str] |
| 551 | + ) -> Dict[str, Dict[str, str]]: |
| 552 | + rb_mgr = RoleBindingManager() |
| 553 | + rb_vos = rb_mgr.filter_role_bindings( |
| 554 | + user_id=params.user_id, |
| 555 | + domain_id=params.domain_id, |
| 556 | + workspace_group_id=workspace_group_ids, |
| 557 | + role_type=["WORKSPACE_OWNER", "WORKSPACE_MEMBER"], |
| 558 | + ) |
| 559 | + return {rb.workspace_group_id: rb.to_dict() for rb in rb_vos} |
| 560 | + |
| 561 | + @staticmethod |
| 562 | + def _extract_user_ids( |
| 563 | + workspace_group_infos: Union[QuerySet, List[Dict[str, str]]] |
| 564 | + ) -> List[str]: |
| 565 | + workspace_group_user_ids = [] |
| 566 | + for workspace_group_info in workspace_group_infos: |
| 567 | + if not isinstance(workspace_group_info, dict): |
| 568 | + workspace_group_info = workspace_group_info.to_dict() |
| 569 | + if users := workspace_group_info.get("users", []) or []: |
| 570 | + for user in users: |
| 571 | + if isinstance(user, dict): |
| 572 | + workspace_group_user_ids.append(user.get("user_id")) |
| 573 | + elif hasattr(user, "user_id"): |
| 574 | + workspace_group_user_ids.append(user.user_id) |
| 575 | + |
| 576 | + return workspace_group_user_ids |
| 577 | + |
| 578 | + def _add_user_name_and_state( |
| 579 | + self, |
| 580 | + workspace_group_infos: Union[QuerySet, List[Dict[str, str]]], |
| 581 | + domain_id: str, |
| 582 | + workspace_group_user_ids: List[str], |
| 583 | + ) -> List[Union[WorkspaceGroup, Dict[str, str]]]: |
| 584 | + updated_workspace_group_infos = [] |
| 585 | + for workspace_group_info in workspace_group_infos: |
| 586 | + updated_workspace_group_infos.append( |
| 587 | + self.workspace_group_svc.add_user_name_and_state_to_users( |
| 588 | + workspace_group_info, domain_id, workspace_group_user_ids |
| 589 | + ) |
| 590 | + ) |
| 591 | + return updated_workspace_group_infos |
| 592 | + |
576 | 593 | @staticmethod
|
577 | 594 | def _get_my_workspace_groups_info(
|
578 |
| - workspace_groups_info: list, role_bindings_info_map: dict = None |
| 595 | + workspace_groups_info: List[Union[WorkspaceGroup, Dict[str, str]]], |
| 596 | + role_bindings_info_map: Dict[str, Dict[str, str]] = None, |
579 | 597 | ) -> List[Dict[str, str]]:
|
580 | 598 | my_workspace_groups_info = []
|
581 | 599 |
|
|
0 commit comments