|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +import pytest |
| 4 | +import requests |
| 5 | +from simple_logger.logger import get_logger |
| 6 | + |
| 7 | +from tests.model_serving.maas_billing.maas_subscription.utils import ( |
| 8 | + bulk_revoke_api_keys, |
| 9 | + get_api_key, |
| 10 | + resolve_api_key_username, |
| 11 | +) |
| 12 | + |
| 13 | +LOGGER = get_logger(name=__name__) |
| 14 | + |
| 15 | + |
| 16 | +@pytest.mark.usefixtures( |
| 17 | + "maas_subscription_controller_enabled_latest", |
| 18 | + "maas_gateway_api", |
| 19 | + "maas_api_gateway_reachable", |
| 20 | +) |
| 21 | +class TestAPIKeyBulkOperations: |
| 22 | + """Tests for MaaS API key bulk revoke operations.""" |
| 23 | + |
| 24 | + @pytest.mark.tier1 |
| 25 | + @pytest.mark.parametrize("ocp_token_for_actor", [{"type": "admin"}], indirect=True) |
| 26 | + def test_bulk_revoke_own_keys( |
| 27 | + self, |
| 28 | + request_session_http: requests.Session, |
| 29 | + base_url: str, |
| 30 | + ocp_token_for_actor: str, |
| 31 | + three_active_api_key_ids: list[str], |
| 32 | + ) -> None: |
| 33 | + """Verify a user can bulk revoke all their own active API keys. |
| 34 | + """ |
| 35 | + username = resolve_api_key_username( |
| 36 | + request_session_http=request_session_http, |
| 37 | + base_url=base_url, |
| 38 | + key_id=three_active_api_key_ids[0], |
| 39 | + ocp_user_token=ocp_token_for_actor, |
| 40 | + ) |
| 41 | + |
| 42 | + bulk_resp, bulk_body = bulk_revoke_api_keys( |
| 43 | + request_session_http=request_session_http, |
| 44 | + base_url=base_url, |
| 45 | + ocp_user_token=ocp_token_for_actor, |
| 46 | + username=username, |
| 47 | + ) |
| 48 | + assert bulk_resp.status_code == 200, ( |
| 49 | + f"Expected 200 on bulk-revoke for user {username}, " |
| 50 | + f"got {bulk_resp.status_code}: {bulk_resp.text[:200]}" |
| 51 | + ) |
| 52 | + revoked_count = bulk_body.get("revokedCount", 0) |
| 53 | + assert revoked_count >= 3, f"Expected at least 3 revoked keys, got revokedCount={revoked_count}" |
| 54 | + LOGGER.info(f"[bulk-revoke] User {username} bulk revoked {revoked_count} key(s)") |
| 55 | + |
| 56 | + for key_id in three_active_api_key_ids: |
| 57 | + get_resp, get_body = get_api_key( |
| 58 | + request_session_http=request_session_http, |
| 59 | + base_url=base_url, |
| 60 | + key_id=key_id, |
| 61 | + ocp_user_token=ocp_token_for_actor, |
| 62 | + ) |
| 63 | + if get_resp.status_code == 200: |
| 64 | + assert get_body.get("status") == "revoked", ( |
| 65 | + f"Expected key id={key_id} to have status='revoked', got: {get_body.get('status')}" |
| 66 | + ) |
| 67 | + LOGGER.info(f"[bulk-revoke] All {len(three_active_api_key_ids)} key(s) confirmed revoked") |
| 68 | + |
| 69 | + @pytest.mark.tier1 |
| 70 | + @pytest.mark.parametrize("ocp_token_for_actor", [{"type": "free"}], indirect=True) |
| 71 | + def test_bulk_revoke_other_user_forbidden( |
| 72 | + self, |
| 73 | + request_session_http: requests.Session, |
| 74 | + base_url: str, |
| 75 | + ocp_token_for_actor: str, |
| 76 | + ) -> None: |
| 77 | + """Verify a non-admin user gets 403 when attempting to bulk revoke another user's keys. |
| 78 | + """ |
| 79 | + bulk_resp, _ = bulk_revoke_api_keys( |
| 80 | + request_session_http=request_session_http, |
| 81 | + base_url=base_url, |
| 82 | + ocp_user_token=ocp_token_for_actor, |
| 83 | + username="someotheruser", |
| 84 | + ) |
| 85 | + assert bulk_resp.status_code == 403, ( |
| 86 | + f"Expected 403 (non-admin cannot bulk revoke other users), " |
| 87 | + f"got {bulk_resp.status_code}: {bulk_resp.text[:200]}" |
| 88 | + ) |
| 89 | + LOGGER.info("[bulk-revoke] Non-admin correctly received 403 when attempting to bulk revoke another user's keys") |
| 90 | + |
| 91 | + @pytest.mark.tier1 |
| 92 | + @pytest.mark.parametrize("ocp_token_for_actor", [{"type": "free"}], indirect=True) |
| 93 | + def test_bulk_revoke_admin_can_revoke_any_user( |
| 94 | + self, |
| 95 | + request_session_http: requests.Session, |
| 96 | + base_url: str, |
| 97 | + active_api_key_id: str, |
| 98 | + free_user_username: str, |
| 99 | + admin_ocp_token: str, |
| 100 | + ) -> None: |
| 101 | + """Verify an admin can bulk revoke any user's active API keys. |
| 102 | + """ |
| 103 | + bulk_resp, bulk_body = bulk_revoke_api_keys( |
| 104 | + request_session_http=request_session_http, |
| 105 | + base_url=base_url, |
| 106 | + ocp_user_token=admin_ocp_token, |
| 107 | + username=free_user_username, |
| 108 | + ) |
| 109 | + assert bulk_resp.status_code == 200, ( |
| 110 | + f"Expected 200 on admin bulk-revoke for user {free_user_username}, " |
| 111 | + f"got {bulk_resp.status_code}: {bulk_resp.text[:200]}" |
| 112 | + ) |
| 113 | + revoked_count = bulk_body.get("revokedCount", 0) |
| 114 | + assert revoked_count >= 1, f"Expected at least 1 revoked key, got revokedCount={revoked_count}" |
| 115 | + LOGGER.info(f"[bulk-revoke] Admin successfully revoked {revoked_count} key(s) for user {free_user_username}") |
0 commit comments