Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 46 additions & 16 deletions mcpgateway/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -3231,27 +3231,33 @@ async def admin_ui(

# --------------------------------------------------------------------------------
# Validate team_id if provided (only when email-based teams are enabled).
# Non-admin users get 403 when they supply a team_id they do not belong to.
# Platform admins with unrestricted tokens (is_admin AND token_teams is None)
# bypass the membership check, consistent with the codebase admin bypass
# convention. Team-scoped admin tokens are still subject to membership checks.
# When team_id is None (not sent), selected_team_id stays None and the
# unscoped/public path works as expected.
# bypass the membership check. Team-scoped admin tokens can still view any
# team for governance. Non-admins get their team filter reset when they
# supply a team_id they do not belong to.
# --------------------------------------------------------------------------------
selected_team_id = team_id
user_email = get_user_email(user)
is_admin_user = bool(user.get("is_admin", False) if isinstance(user, dict) else getattr(user, "is_admin", False))
admin_viewing_non_member_team = False

if team_id and getattr(settings, "email_auth_enabled", False):
_is_admin = bool(user.get("is_admin", False) if isinstance(user, dict) else getattr(user, "is_admin", False))
_token_teams = user.get("token_teams") if isinstance(user, dict) else getattr(user, "token_teams", None)
if not (_is_admin and _token_teams is None):
if not (is_admin_user and _token_teams is None):
if not user_teams:
LOGGER.warning("team_id requested but user_teams not available; rejecting (team_id=%s)", team_id)
raise HTTPException(status_code=403, detail="Unable to verify team membership")

valid_team_ids = {t["id"] for t in user_teams if t.get("id")}
if str(team_id) not in valid_team_ids:
LOGGER.warning("Requested team_id is not in user's teams; rejecting (team_id=%s)", team_id)
raise HTTPException(status_code=403, detail="Not a member of the requested team")
if not is_admin_user:
LOGGER.warning("Non-admin requested team_id not in their teams; ignoring team filter (team_id=%s)", team_id)
selected_team_id = None
else:
# Admin selected a team they don't belong to; show banner and default content to All Teams
LOGGER.info("Admin viewing non-member team for governance (team_id=%s)", team_id)
admin_viewing_non_member_team = True
selected_team_id = None

# --------------------------------------------------------------------------------
# Helper: attempt to call a listing function with team_id if it supports it.
Expand Down Expand Up @@ -3600,6 +3606,7 @@ def _to_dict_and_filter(raw_list):
"mcpgateway_ui_tool_test_timeout": settings.mcpgateway_ui_tool_test_timeout,
"allow_public_visibility": settings.allow_public_visibility,
"selected_team_id": selected_team_id,
"admin_viewing_non_member_team": admin_viewing_non_member_team,
"ui_airgapped": settings.mcpgateway_ui_airgapped,
"ui_hidden_sections": ui_visibility_config["hidden_sections"],
"ui_hidden_header_items": ui_visibility_config["hidden_header_items"],
Expand Down Expand Up @@ -4739,7 +4746,8 @@ async def admin_get_all_team_ids(

# Check admin
if current_user.is_admin:
team_ids = await team_service.get_all_team_ids(include_inactive=include_inactive, visibility_filter=visibility, include_personal=True, search_query=q)
# Admin sees all non-personal teams plus their own personal team (single query)
team_ids = await team_service.get_all_team_ids(include_inactive=include_inactive, visibility_filter=visibility, include_personal=False, search_query=q, personal_owner_email=user_email)
else:
# For non-admins, get user's teams + public teams logic?
# get_user_teams gets all teams user is in.
Expand Down Expand Up @@ -4810,7 +4818,10 @@ async def admin_search_teams(
# The CALLER (admin.py) distinguishes.

if current_user.is_admin:
result = await team_service.list_teams(page=1, per_page=limit, include_inactive=include_inactive, visibility_filter=visibility, include_personal=True, search_query=search_query)
# Admin sees all non-personal teams plus their own personal team (single query)
result = await team_service.list_teams(
page=1, per_page=limit, include_inactive=include_inactive, visibility_filter=visibility, include_personal=False, search_query=search_query, personal_owner_email=user_email
)
# Result is dict {data, pagination...} (since page provided)
teams = result["data"]
else:
Expand Down Expand Up @@ -4911,9 +4922,9 @@ async def admin_teams_partial_html(
pending_requests = team_service.get_pending_join_requests_batch(user_email, list(public_team_ids))

if current_user.is_admin and not relationship:
# Admin sees all teams when no relationship filter
# Admin sees all non-personal teams plus their own personal team (single query, correct pagination)
paginated_result = await team_service.list_teams(
page=page, per_page=per_page, include_inactive=include_inactive, visibility_filter=visibility, base_url=base_url, include_personal=True, search_query=q
page=page, per_page=per_page, include_inactive=include_inactive, visibility_filter=visibility, base_url=base_url, include_personal=False, search_query=q, personal_owner_email=user_email
)
data = paginated_result["data"]
pagination = paginated_result["pagination"]
Expand Down Expand Up @@ -5116,7 +5127,8 @@ async def admin_list_teams(
if q:
base_url += f"?q={urllib.parse.quote(q, safe='')}"

paginated_result = await team_service.list_teams(page=page, per_page=per_page, base_url=base_url, include_personal=True, search_query=q)
# Admin sees all non-personal teams plus their own personal team (single query, correct pagination)
paginated_result = await team_service.list_teams(page=page, per_page=per_page, base_url=base_url, include_personal=False, search_query=q, personal_owner_email=user_email)
data = paginated_result["data"]
pagination = paginated_result["pagination"]
links = paginated_result["links"]
Expand Down Expand Up @@ -5575,6 +5587,10 @@ async def admin_get_team_edit(
if not team:
return HTMLResponse(content='<div class="text-red-500">Team not found</div>', status_code=404)

# Personal teams cannot be updated (service rejects all personal team updates)
if team.is_personal:
return HTMLResponse(content='<div class="text-red-500">Personal teams cannot be edited</div>', status_code=403)

safe_team_name = html.escape(team.name, quote=True)
safe_description = html.escape(team.description or "")
edit_form = rf"""
Expand Down Expand Up @@ -5714,7 +5730,18 @@ async def admin_update_team(

# Update team
user_email = getattr(user, "email", None) or str(user)
await team_service.update_team(team_id=team_id, name=name, description=description, visibility=visibility, updated_by=user_email)
updated = await team_service.update_team(team_id=team_id, name=name, description=description, visibility=visibility, updated_by=user_email)

if not updated:
is_htmx = request.headers.get("HX-Request") == "true"
error_html = '<div class="text-red-500 p-3 bg-red-50 dark:bg-red-900/20 rounded-md mb-4">Team cannot be updated</div>'
if is_htmx:
response = HTMLResponse(content=error_html, status_code=400)
response.headers["HX-Retarget"] = "#edit-team-error"
response.headers["HX-Reswap"] = "innerHTML"
return response
error_msg = urllib.parse.quote("Team cannot be updated")
return RedirectResponse(url=f"{root_path}/admin/?error={error_msg}#teams", status_code=303)

# Check if this is an HTMX request
is_htmx = request.headers.get("HX-Request") == "true"
Expand Down Expand Up @@ -5775,7 +5802,10 @@ async def admin_delete_team(

# Delete team (get user email from JWT payload)
user_email = get_user_email(user)
await team_service.delete_team(team_id, deleted_by=user_email)
deleted = await team_service.delete_team(team_id, deleted_by=user_email)

if not deleted:
return HTMLResponse(content='<div class="text-red-500">Team cannot be deleted</div>', status_code=400)

# Return success message with script to refresh teams list
safe_team_name = html.escape(team_name)
Expand Down
24 changes: 22 additions & 2 deletions mcpgateway/services/team_management_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -1265,6 +1265,7 @@ async def list_teams(
base_url: Optional[str] = None,
include_personal: bool = False,
search_query: Optional[str] = None,
personal_owner_email: Optional[str] = None,
) -> Union[Tuple[List[EmailTeam], Optional[str]], Dict[str, Any]]:
"""List teams with pagination support (cursor or page based).

Expand All @@ -1279,6 +1280,7 @@ async def list_teams(
base_url: Base URL for pagination links
include_personal: Whether to include personal teams
search_query: Search term for name/slug/description
personal_owner_email: When set (and include_personal=False), includes this user's personal team alongside non-personal teams

Returns:
Union[Tuple[List[EmailTeam], Optional[str]], Dict[str, Any]]:
Expand All @@ -1288,7 +1290,15 @@ async def list_teams(
query = select(EmailTeam)

if not include_personal:
query = query.where(EmailTeam.is_personal.is_(False))
if personal_owner_email:
query = query.where(
or_(
EmailTeam.is_personal.is_(False),
and_(EmailTeam.is_personal.is_(True), EmailTeam.created_by == personal_owner_email),
)
)
else:
query = query.where(EmailTeam.is_personal.is_(False))

if not include_inactive:
query = query.where(EmailTeam.is_active.is_(True))
Expand Down Expand Up @@ -1340,6 +1350,7 @@ async def get_all_team_ids(
visibility_filter: Optional[str] = None,
include_personal: bool = False,
search_query: Optional[str] = None,
personal_owner_email: Optional[str] = None,
) -> List[int]:
"""Get all team IDs matching criteria (unpaginated).

Expand All @@ -1348,14 +1359,23 @@ async def get_all_team_ids(
visibility_filter: Filter by visibility (private, team, public)
include_personal: Whether to include personal teams
search_query: Search term for name/slug
personal_owner_email: When set (and include_personal=False), includes this user's personal team alongside non-personal teams

Returns:
List[int]: List of team IDs
"""
query = select(EmailTeam.id)

if not include_personal:
query = query.where(EmailTeam.is_personal.is_(False))
if personal_owner_email:
query = query.where(
or_(
EmailTeam.is_personal.is_(False),
and_(EmailTeam.is_personal.is_(True), EmailTeam.created_by == personal_owner_email),
)
)
else:
query = query.where(EmailTeam.is_personal.is_(False))

if not include_inactive:
query = query.where(EmailTeam.is_active.is_(True))
Expand Down
16 changes: 16 additions & 0 deletions mcpgateway/templates/admin.html
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,22 @@ <h1 class="text-lg font-semibold text-gray-800 dark:text-gray-200 hidden sm:bloc
</div>
</header>

<!-- Admin Non-Member Team Banner -->
{% if admin_viewing_non_member_team %}
<div class="bg-red-100 dark:bg-red-900/30 border-b border-red-300 dark:border-red-800">
<div class="px-4 py-3">
<div class="flex items-center">
<svg class="h-5 w-5 text-red-600 dark:text-red-400 mr-3 flex-shrink-0" fill="currentColor" viewBox="0 0 20 20">
<path fill-rule="evenodd" d="M18 10a8 8 0 11-16 0 8 8 0 0116 0zm-7-4a1 1 0 11-2 0 1 1 0 012 0zM9 9a1 1 0 000 2v3a1 1 0 001 1h1a1 1 0 100-2v-3a1 1 0 00-1-1H9z" clip-rule="evenodd" />
</svg>
<p class="text-sm font-semibold text-red-800 dark:text-red-200">
⚠ Since you are not a member of this team, team content won't be visible. Defaulting to All Teams.
</p>
</div>
</div>
</div>
{% endif %}

<!-- Global notification container for flash messages -->
<div id="global-notification" class="px-4 mt-4" style="display: none;">
<!-- Notification content will be inserted here -->
Expand Down
28 changes: 14 additions & 14 deletions tests/playwright/entities/test_agents_extended.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,15 +364,15 @@ def test_view_modal_different_agents_show_different_data(

# View first agent - get name from table
first_row = agents_page.get_agent_row(0)
first_name = first_row.locator("td").nth(2).text_content().strip()
first_name = first_row.locator("td").nth(3).text_content().strip()
_open_view_modal(agents_page, 0)
details = agents_page.page.locator("#agent-details")
expect(details).to_contain_text(first_name)
_close_view_modal(agents_page)

# View second agent
second_row = agents_page.get_agent_row(1)
second_name = second_row.locator("td").nth(2).text_content().strip()
second_name = second_row.locator("td").nth(3).text_content().strip()
_open_view_modal(agents_page, 1)
expect(details).to_contain_text(second_name)
_close_view_modal(agents_page)
Expand Down Expand Up @@ -567,7 +567,7 @@ def test_edit_modal_cancel_does_not_save(self, agents_page: AgentsPage):

# Get original name from table
first_row = agents_page.get_agent_row(0)
original_name = first_row.locator("td").nth(2).text_content().strip()
original_name = first_row.locator("td").nth(3).text_content().strip()

_open_edit_modal(agents_page, 0)

Expand All @@ -588,7 +588,7 @@ def test_edit_modal_cancel_does_not_save(self, agents_page: AgentsPage):
_skip_if_no_agents(agents_page)

first_row = agents_page.get_agent_row(0)
current_name = first_row.locator("td").nth(2).text_content().strip()
current_name = first_row.locator("td").nth(3).text_content().strip()
assert current_name == original_name, (
f"Name should be unchanged after Cancel: expected '{original_name}', "
f"got '{current_name}'"
Expand Down Expand Up @@ -1016,8 +1016,8 @@ def test_agent_name_displayed_in_row(self, agents_page: AgentsPage):
_skip_if_no_agents(agents_page)

first_row = agents_page.get_agent_row(0)
# Name is in column index 2
name_cell = first_row.locator("td").nth(2)
# Name is in column index 3 (after Actions, S.No., Agent ID)
name_cell = first_row.locator("td").nth(3)
name_text = name_cell.text_content().strip()
assert len(name_text) > 0, "Agent name should not be empty"

Expand All @@ -1028,8 +1028,8 @@ def test_endpoint_url_displayed_in_row(self, agents_page: AgentsPage):
_skip_if_no_agents(agents_page)

first_row = agents_page.get_agent_row(0)
# Endpoint is in column index 4
endpoint_cell = first_row.locator("td").nth(4)
# Endpoint is in column index 5 (after Actions, S.No., Agent ID, Name, Description)
endpoint_cell = first_row.locator("td").nth(5)
endpoint_text = endpoint_cell.text_content().strip()
assert len(endpoint_text) > 0, "Endpoint URL should not be empty"
assert "://" in endpoint_text, (
Expand All @@ -1043,8 +1043,8 @@ def test_description_displayed_in_row(self, agents_page: AgentsPage):
_skip_if_no_agents(agents_page)

first_row = agents_page.get_agent_row(0)
# Description is in column index 3
desc_cell = first_row.locator("td").nth(3)
# Description is in column index 4 (after Actions, S.No., Agent ID, Name)
desc_cell = first_row.locator("td").nth(4)
# Description may be empty but the cell should exist
expect(desc_cell).to_be_attached()

Expand All @@ -1055,8 +1055,8 @@ def test_tags_displayed_as_badges(self, agents_page: AgentsPage):
_skip_if_no_agents(agents_page)

first_row = agents_page.get_agent_row(0)
# Tags column is at index 5
tags_cell = first_row.locator("td").nth(5)
# Tags column is at index 6 (after Actions, S.No., Agent ID, Name, Description, Endpoint)
tags_cell = first_row.locator("td").nth(6)

# Check if there are any tag badges (spans with inline-flex styling)
tag_badges = tags_cell.locator("span")
Expand All @@ -1074,8 +1074,8 @@ def test_visibility_badge_displayed(self, agents_page: AgentsPage):
_skip_if_no_agents(agents_page)

first_row = agents_page.get_agent_row(0)
# Visibility is the last column, index 11
visibility_cell = first_row.locator("td").nth(11)
# Visibility is the last column, index 12
visibility_cell = first_row.locator("td").nth(12)
visibility_text = visibility_cell.text_content().strip()
assert visibility_text in [
"Public",
Expand Down
Loading
Loading