Skip to content

Commit 6d34b8b

Browse files
committed
test(security): close diff-coverage gaps in PR #4341 hybrid policy helpers
diff-cover surfaced five PR #4341 lines that no test exercised. All five are policy-critical paths in the new hybrid visibility helpers; without coverage, a regression that only broke one branch could ship undetected. Added 17 new tests across 4 files. Coverage gaps closed: - server_service.py:1022-1044 (was 35.7% diff coverage). New TestServerAccessCheckMatrix in test_authorization_access.py with 7 tests mirroring the TestCheckToolAccess pattern in test_tool_service.py. Each test names the production line(s) it covers (no-user-email deny, public-only token deny, own-private allow, JWT team match, DB team lookup, fall-through deny). - gateway_service.py:2734,2739-2761 (was 32.1%). New TestGatewayAccessCheckMatrix with the same 7-test shape; gateway and server helpers are sibling implementations of the canonical hybrid policy. - base_service.py:216 (was 90%). New test_apply_visibility_scope_db_admin_includes_own_private_only — the existing test_apply_visibility_scope_admin_bypass_excludes_private only covered (None, None); the (email, None) DB-admin branch was unexecuted. Uses the same exact-OR-count predicate guard as the other DB-admin tests so a too-broad predicate fails. - a2a_service.py:1155 (was 91.7%). New test_get_agent_card_returns_none_when_visibility_denies covers the deny path of the cycle-2 S6-a in-service gate. - auth_context.py:215 (was 99.1%). The non-object payload guard in decode_internal_mcp_auth_context had a test, but the test was named 'testdecode_*' (missing underscore separator) so pytest's default 'test_*' collection pattern rejected it — the assertion never ran. This is the same bug class as cycle-1 B6. Renamed to test_decode_internal_mcp_auth_context_rejects_non_object_payload with a docstring naming the bug class. Other testCASE-without-underscore names exist elsewhere (test_resource_service.py:2007/2024/2033, test_a2a_service.py:942, test_toolops_altk_service.py:59) but predate PR #4341 and are out of scope. Worth filing a separate cleanup issue. Verified: 8443 tests pass across services/, utils/test_gateway_access.py, test_internal_a2a_endpoints.py, test_main.py, test_main_extended.py, test_admin.py (+17 from this commit; +1 of those is the previously-broken testdecode_* now actually running). Signed-off-by: Jonathan Springer <jps@s390x.com>
1 parent 6a94728 commit 6d34b8b

5 files changed

Lines changed: 229 additions & 8 deletions

File tree

.secrets.baseline

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"files": "(?x)( package-lock\\.json$ |Cargo\\.lock$ |uv\\.lock$ |go\\.sum$ |mcpgateway/sri_hashes\\.json$ )|^.secrets.baseline$",
44
"lines": null
55
},
6-
"generated_at": "2026-04-26T08:27:41Z",
6+
"generated_at": "2026-04-26T09:19:42Z",
77
"plugins_used": [
88
{
99
"name": "AWSKeyDetector"
@@ -7450,23 +7450,23 @@
74507450
"hashed_secret": "d92342976d720ff38cf5dcb329be41959ab1ba6c",
74517451
"is_secret": false,
74527452
"is_verified": false,
7453-
"line_number": 3268,
7453+
"line_number": 3292,
74547454
"type": "Secret Keyword",
74557455
"verified_result": null
74567456
},
74577457
{
74587458
"hashed_secret": "a343c9b5b1abfcf385d5c6f6dc0282f4d88a416e",
74597459
"is_secret": false,
74607460
"is_verified": false,
7461-
"line_number": 3429,
7461+
"line_number": 3453,
74627462
"type": "Secret Keyword",
74637463
"verified_result": null
74647464
},
74657465
{
74667466
"hashed_secret": "c00dbbc9dadfbe1e232e93a729dd4752fade0abf",
74677467
"is_secret": false,
74687468
"is_verified": false,
7469-
"line_number": 3461,
7469+
"line_number": 3485,
74707470
"type": "Secret Keyword",
74717471
"verified_result": null
74727472
}
@@ -9262,15 +9262,15 @@
92629262
"hashed_secret": "6745fa570d36be08400efa1cbc2f057bb001290e",
92639263
"is_secret": false,
92649264
"is_verified": false,
9265-
"line_number": 2515,
9265+
"line_number": 2523,
92669266
"type": "Hex High Entropy String",
92679267
"verified_result": null
92689268
},
92699269
{
92709270
"hashed_secret": "4f13f134744a2fadbbe2d624687246347d12fa63",
92719271
"is_secret": false,
92729272
"is_verified": false,
9273-
"line_number": 2803,
9273+
"line_number": 2811,
92749274
"type": "Hex High Entropy String",
92759275
"verified_result": null
92769276
}

tests/unit/mcpgateway/services/test_a2a_service.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2680,6 +2680,30 @@ async def test_get_agent_card_returns_none_when_agent_missing(self, service, moc
26802680

26812681
assert await service.get_agent_card(mock_db, "missing") is None
26822682

2683+
async def test_get_agent_card_returns_none_when_visibility_denies(self, service, mock_db):
2684+
"""PR #4341 (S6-a) coverage: in-service gate returns None on visibility deny.
2685+
2686+
Exercises a2a_service.py:1155 — the deny path of the gate ``get_agent_card``
2687+
adopted in cycle 2. A private agent owned by a different user with the
2688+
anonymous-bypass shape must return None (not raise, not return the card).
2689+
"""
2690+
agent = SimpleNamespace(
2691+
name="ag",
2692+
description="desc",
2693+
endpoint_url="https://x.com",
2694+
version=1,
2695+
protocol_version="1.0",
2696+
capabilities={},
2697+
visibility="private",
2698+
team_id=None,
2699+
owner_email="other@example.com",
2700+
)
2701+
mock_db.execute.return_value.scalar_one_or_none.return_value = agent
2702+
2703+
result = await service.get_agent_card(mock_db, "ag", user_email=None, token_teams=None)
2704+
2705+
assert result is None
2706+
26832707
async def test_get_agent_card_builds_capabilities(self, service, mock_db):
26842708
agent = SimpleNamespace(
26852709
name="ag",

tests/unit/mcpgateway/services/test_authorization_access.py

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
"""
1515

1616
# Standard
17+
from types import SimpleNamespace
1718
from unittest.mock import AsyncMock, MagicMock, Mock, patch
1819

1920
# Third-Party
@@ -1401,3 +1402,152 @@ async def test_tag_apply_visibility_scope_admin_bypass_excludes_private(self):
14011402
assert "visibility" in compiled
14021403
assert "private" in compiled
14031404
assert "!=" in compiled or "<>" in compiled
1405+
1406+
1407+
class TestServerAccessCheckMatrix:
1408+
"""Branch coverage for ServerService._check_server_access (server_service.py:1015-1044).
1409+
1410+
Mirrors the existing TestCheckToolAccess matrix in test_tool_service.py so that
1411+
every documented policy outcome (public allow, anonymous bypass deny private,
1412+
public-only token deny, own-private allow, team membership via JWT, team
1413+
membership via DB lookup, final deny) has at least one focused regression.
1414+
"""
1415+
1416+
@pytest.fixture
1417+
def service(self):
1418+
from mcpgateway.services.server_service import ServerService
1419+
1420+
return ServerService()
1421+
1422+
@staticmethod
1423+
def _server(visibility: str, owner_email=None, team_id=None) -> MagicMock:
1424+
s = MagicMock()
1425+
s.visibility = visibility
1426+
s.owner_email = owner_email
1427+
s.team_id = team_id
1428+
return s
1429+
1430+
@pytest.mark.asyncio
1431+
async def test_public_always_allowed(self, service):
1432+
"""Public visibility short-circuits before any other check."""
1433+
s = self._server("public")
1434+
assert await service._check_server_access(MagicMock(), s, user_email=None, token_teams=[]) is True
1435+
assert await service._check_server_access(MagicMock(), s, user_email="any@test.com", token_teams=["team-x"]) is True
1436+
1437+
@pytest.mark.asyncio
1438+
async def test_anonymous_admin_bypass_denies_private(self, service):
1439+
"""(None, None) anonymous admin bypass: team allowed, private denied."""
1440+
assert await service._check_server_access(MagicMock(), self._server("private", owner_email="other@test.com"), user_email=None, token_teams=None) is False
1441+
assert await service._check_server_access(MagicMock(), self._server("team", team_id="team-x"), user_email=None, token_teams=None) is True
1442+
1443+
@pytest.mark.asyncio
1444+
async def test_no_user_email_with_token_teams_denied(self, service):
1445+
"""(None, [team]) shape: server_service.py line 1022-1023 — without user_email, denied."""
1446+
assert await service._check_server_access(MagicMock(), self._server("team", team_id="team-x"), user_email=None, token_teams=["team-x"]) is False
1447+
1448+
@pytest.mark.asyncio
1449+
async def test_public_only_token_denies_non_public(self, service):
1450+
"""(email, []) public-only token: covers server_service.py line 1025-1027."""
1451+
s_team = self._server("team", team_id="team-x")
1452+
s_private = self._server("private", owner_email="user@test.com")
1453+
assert await service._check_server_access(MagicMock(), s_team, user_email="user@test.com", token_teams=[]) is False
1454+
assert await service._check_server_access(MagicMock(), s_private, user_email="user@test.com", token_teams=[]) is False
1455+
1456+
@pytest.mark.asyncio
1457+
async def test_own_private_allowed(self, service):
1458+
"""Owner of a private server can read it (covers line 1029-1031)."""
1459+
own = self._server("private", owner_email="user@test.com")
1460+
assert await service._check_server_access(MagicMock(), own, user_email="user@test.com", token_teams=["team-x"]) is True
1461+
1462+
@pytest.mark.asyncio
1463+
async def test_team_member_via_jwt_token_teams(self, service):
1464+
"""token_teams from JWT carries team membership (covers line 1033-1036)."""
1465+
s = self._server("team", team_id="team-x")
1466+
assert await service._check_server_access(MagicMock(), s, user_email="user@test.com", token_teams=["team-x"]) is True
1467+
# Non-matching team in JWT → denied (covers line 1044 fall-through).
1468+
assert await service._check_server_access(MagicMock(), s, user_email="user@test.com", token_teams=["team-y"]) is False
1469+
1470+
@pytest.mark.asyncio
1471+
async def test_team_member_via_db_lookup(self, service):
1472+
"""token_teams=None with email + team server: falls back to TeamManagementService (covers line 1038-1042)."""
1473+
s = self._server("team", team_id="team-x")
1474+
1475+
with patch("mcpgateway.services.server_service.TeamManagementService") as mock_tms_cls:
1476+
mock_tms_cls.return_value.get_user_teams = AsyncMock(return_value=[SimpleNamespace(id="team-x")])
1477+
allowed = await service._check_server_access(MagicMock(), s, user_email="user@test.com", token_teams=None)
1478+
1479+
assert allowed is True
1480+
mock_tms_cls.return_value.get_user_teams.assert_awaited_once_with("user@test.com")
1481+
1482+
1483+
class TestGatewayAccessCheckMatrix:
1484+
"""Branch coverage for GatewayService._check_gateway_access (gateway_service.py:2732-2761).
1485+
1486+
Same shape as TestServerAccessCheckMatrix above — both helpers were added by
1487+
PR #4341 and both implement the canonical hybrid visibility policy.
1488+
"""
1489+
1490+
@pytest.fixture
1491+
def service(self):
1492+
from mcpgateway.services.gateway_service import GatewayService
1493+
1494+
return GatewayService()
1495+
1496+
@staticmethod
1497+
def _gw(visibility: str, owner_email=None, team_id=None) -> MagicMock:
1498+
g = MagicMock()
1499+
g.visibility = visibility
1500+
g.owner_email = owner_email
1501+
g.team_id = team_id
1502+
return g
1503+
1504+
@pytest.mark.asyncio
1505+
async def test_public_always_allowed(self, service):
1506+
"""Covers gateway_service.py line 2734."""
1507+
g = self._gw("public")
1508+
assert await service._check_gateway_access(MagicMock(), g, user_email=None, token_teams=[]) is True
1509+
1510+
@pytest.mark.asyncio
1511+
async def test_anonymous_admin_bypass_denies_private(self, service):
1512+
"""(None, None) anonymous admin bypass: team allowed, private denied."""
1513+
assert await service._check_gateway_access(MagicMock(), self._gw("private", owner_email="other@test.com"), user_email=None, token_teams=None) is False
1514+
assert await service._check_gateway_access(MagicMock(), self._gw("team", team_id="team-x"), user_email=None, token_teams=None) is True
1515+
1516+
@pytest.mark.asyncio
1517+
async def test_no_user_email_with_token_teams_denied(self, service):
1518+
"""(None, [team]) shape: covers gateway_service.py line 2739-2740."""
1519+
assert await service._check_gateway_access(MagicMock(), self._gw("team", team_id="team-x"), user_email=None, token_teams=["team-x"]) is False
1520+
1521+
@pytest.mark.asyncio
1522+
async def test_public_only_token_denies_non_public(self, service):
1523+
"""(email, []) public-only token: covers line 2742-2744."""
1524+
g_team = self._gw("team", team_id="team-x")
1525+
g_private = self._gw("private", owner_email="user@test.com")
1526+
assert await service._check_gateway_access(MagicMock(), g_team, user_email="user@test.com", token_teams=[]) is False
1527+
assert await service._check_gateway_access(MagicMock(), g_private, user_email="user@test.com", token_teams=[]) is False
1528+
1529+
@pytest.mark.asyncio
1530+
async def test_own_private_allowed(self, service):
1531+
"""Owner of a private gateway can read it (covers line 2746-2748)."""
1532+
own = self._gw("private", owner_email="user@test.com")
1533+
assert await service._check_gateway_access(MagicMock(), own, user_email="user@test.com", token_teams=["team-x"]) is True
1534+
1535+
@pytest.mark.asyncio
1536+
async def test_team_member_via_jwt_token_teams(self, service):
1537+
"""token_teams from JWT carries team membership (covers line 2750-2753)."""
1538+
g = self._gw("team", team_id="team-x")
1539+
assert await service._check_gateway_access(MagicMock(), g, user_email="user@test.com", token_teams=["team-x"]) is True
1540+
# Non-matching team in JWT → denied (covers line 2761 fall-through).
1541+
assert await service._check_gateway_access(MagicMock(), g, user_email="user@test.com", token_teams=["team-y"]) is False
1542+
1543+
@pytest.mark.asyncio
1544+
async def test_team_member_via_db_lookup(self, service):
1545+
"""token_teams=None with email + team gateway: falls back to TeamManagementService (covers line 2755-2759)."""
1546+
g = self._gw("team", team_id="team-x")
1547+
1548+
with patch("mcpgateway.services.gateway_service.TeamManagementService") as mock_tms_cls:
1549+
mock_tms_cls.return_value.get_user_teams = AsyncMock(return_value=[SimpleNamespace(id="team-x")])
1550+
allowed = await service._check_gateway_access(MagicMock(), g, user_email="user@test.com", token_teams=None)
1551+
1552+
assert allowed is True
1553+
mock_tms_cls.return_value.get_user_teams.assert_awaited_once_with("user@test.com")

tests/unit/mcpgateway/services/test_base_service.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,45 @@ async def test_db_lookup_fallback_no_user_email(self, service, mock_db, query):
187187
assert result == "filtered"
188188
query.where.assert_called_once()
189189

190+
def test_apply_visibility_scope_db_admin_includes_own_private_only(self):
191+
"""PR #4341 carve-out coverage for the static ``_apply_visibility_scope`` helper.
192+
193+
``_apply_visibility_scope`` is the sibling of ``_apply_access_control`` used by
194+
completion/tag enumeration. Without this test the (email, None) DB-admin
195+
branch in base_service.py:215-221 was unexecuted by the suite; the existing
196+
``test_apply_visibility_scope_admin_bypass_excludes_private`` coverage in
197+
test_authorization_access.py only exercises the ``(None, None)`` shape.
198+
"""
199+
from mcpgateway.services.base_service import BaseService
200+
from mcpgateway.services.tag_service import TagService
201+
202+
service = TagService()
203+
stmt = sa.select(_FakeItem)
204+
mock_db = MagicMock()
205+
206+
with patch("mcpgateway.services.base_service.is_user_admin", return_value=True):
207+
scoped = BaseService._apply_visibility_scope(
208+
stmt,
209+
_FakeItem,
210+
user_email="dba@test.com",
211+
token_teams=None,
212+
team_ids=[],
213+
db=mock_db,
214+
)
215+
216+
compiled = _compile_where(scoped)
217+
assert "visibility != 'private'" in compiled, f"public/team carve-out missing: {compiled}"
218+
assert "visibility = 'private'" in compiled, f"own-private allowance missing: {compiled}"
219+
assert "owner_email = 'dba@test.com'" in compiled, f"owner clause must bind caller: {compiled}"
220+
# Same OR-count guard as test_db_admin_bypass_includes_own_private_only.
221+
or_count = compiled.upper().count(" OR ")
222+
assert or_count == 1, f"expected exactly 1 OR in WHERE clause, got {or_count}: {compiled}"
223+
# Sanity: TagService._apply_visibility_scope is unused here (the static
224+
# helper on BaseService is what we exercise) but importing it ensures the
225+
# module-level binding exists so this test fails loudly if a refactor
226+
# removes the indirection.
227+
assert callable(getattr(service, "_apply_visibility_scope", None))
228+
190229
@pytest.mark.asyncio
191230
async def test_db_admin_bypass_includes_own_private_only(self, service, mock_db):
192231
"""PR #4341 carve-out: DB-admin (email, None) shape compiles a WHERE that allows own private but not others'.

tests/unit/mcpgateway/test_main_extended.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1166,8 +1166,16 @@ def test_serialize_legacy_tool_payloads_preserves_dicts_and_unknowns(self):
11661166
class TestInternalMcpHelperCoverage:
11671167
"""Target helper branches added for trusted Rust MCP forwarding."""
11681168

1169-
def testdecode_internal_mcp_auth_context_rejects_non_object_payload(self):
1170-
"""Non-object JSON payloads should be rejected."""
1169+
def test_decode_internal_mcp_auth_context_rejects_non_object_payload(self):
1170+
"""Non-object JSON payloads (lists, scalars) must raise ValueError.
1171+
1172+
The pre-fix name ``testdecode_*`` was missing the underscore separator
1173+
and silently failed pytest's default ``test_*`` collection pattern, so
1174+
the assertion below — guarding ``auth_context.decode_internal_mcp_auth_context``
1175+
line 215 — was never executed. A non-object payload smuggled past this
1176+
gate would let the helper return a list or scalar that the trust-header
1177+
plumbing assumes is a dict.
1178+
"""
11711179
header_value = base64.urlsafe_b64encode(orjson.dumps(["not-an-object"])).decode().rstrip("=")
11721180
with pytest.raises(ValueError, match="must be an object"):
11731181
decode_internal_mcp_auth_context(header_value)

0 commit comments

Comments
 (0)