forked from opendatahub-io/opendatahub-tests
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_mr_rbac.py
More file actions
285 lines (257 loc) · 11.2 KB
/
test_mr_rbac.py
File metadata and controls
285 lines (257 loc) · 11.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
"""
Test suite for verifying user and group permissions for the Model Registry.
This suite tests various RBAC scenarios including:
- Basic user access permissions (admin vs normal user)
- Group-based access control
- User addition to groups and permission changes
- Role and RoleBinding management
"""
import pytest
from typing import Self, Generator, List
from simple_logger.logger import get_logger
from model_registry import ModelRegistry as ModelRegistryClient
from timeout_sampler import TimeoutSampler
from ocp_resources.data_science_cluster import DataScienceCluster
from ocp_resources.group import Group
from ocp_resources.role_binding import RoleBinding
from ocp_resources.secret import Secret
from ocp_resources.persistent_volume_claim import PersistentVolumeClaim
from ocp_resources.service import Service
from ocp_resources.deployment import Deployment
from tests.model_registry.rbac.multiple_instance_utils import MR_MULTIPROJECT_TEST_SCENARIO_PARAMS
from tests.model_registry.rbac.utils import (
build_mr_client_args,
assert_positive_mr_registry,
assert_forbidden_access,
)
from tests.model_registry.constants import NUM_MR_INSTANCES
from utilities.infra import get_openshift_token
from mr_openapi.exceptions import ForbiddenException
from utilities.user_utils import UserTestSession
from kubernetes.dynamic import DynamicClient
from utilities.resources.model_registry_modelregistry_opendatahub_io import ModelRegistry
from tests.model_registry.utils import get_mr_service_by_label, get_endpoint_from_mr_service, get_mr_user_token
from tests.model_registry.rbac.utils import grant_mr_access, revoke_mr_access
from utilities.constants import Protocols
LOGGER = get_logger(name=__name__)
pytestmark = [pytest.mark.usefixtures("original_user", "test_idp_user")]
@pytest.mark.usefixtures(
"updated_dsc_component_state_scope_session",
"model_registry_namespace",
"model_registry_metadata_db_resources",
"model_registry_instance",
)
@pytest.mark.custom_namespace
class TestUserPermission:
@pytest.mark.sanity
def test_user_permission_non_admin_user(
self: Self,
is_byoidc: bool,
admin_client: DynamicClient,
test_idp_user,
model_registry_instance_rest_endpoint: list[str],
user_credentials_rbac: dict[str, str],
login_as_test_user: None,
):
"""
This test verifies that non-admin users cannot access the Model Registry (403 Forbidden)
"""
if is_byoidc:
token = get_mr_user_token(admin_client=admin_client, user_credentials_rbac=user_credentials_rbac)
else:
token = get_openshift_token()
client_args = build_mr_client_args(rest_endpoint=model_registry_instance_rest_endpoint[0], token=token)
with pytest.raises(ForbiddenException) as exc_info:
ModelRegistryClient(**client_args)
assert exc_info.value.status == 403, f"Expected HTTP 403 ForbiddenException, but got {exc_info.value.status}"
LOGGER.info("Successfully received expected HTTP 403 status code")
@pytest.mark.sanity
def test_user_added_to_group(
self: Self,
is_byoidc: bool,
admin_client: DynamicClient,
model_registry_instance_rest_endpoint: list[str],
test_idp_user: UserTestSession,
user_credentials_rbac: dict[str, str],
model_registry_group_with_user: Group,
login_as_test_user: Generator[UserTestSession, None, None],
):
"""
This test verifies that:
1. After adding the user to the appropriate group, they gain access
"""
# Wait for access to be granted
# Create a copy to avoid mutating the shared fixture
creds_copy = user_credentials_rbac.copy()
creds_copy["username"] = "mr-user1"
sampler = TimeoutSampler(
wait_timeout=240,
sleep=5,
func=assert_positive_mr_registry,
model_registry_instance_rest_endpoint=model_registry_instance_rest_endpoint[0],
token=get_openshift_token()
if not is_byoidc
else get_mr_user_token(admin_client=admin_client, user_credentials_rbac=creds_copy),
)
for _ in sampler:
break # Break after first successful iteration
LOGGER.info("Successfully accessed Model Registry")
@pytest.mark.sanity
def test_create_group(
self: Self,
skip_test_on_byoidc: None,
test_idp_user: UserTestSession,
model_registry_instance_rest_endpoint: list[str],
created_role_binding_group: RoleBinding,
login_as_test_user: None,
):
"""
Test creating a new group and granting it Model Registry access.
This test verifies that:
1. A new group can be created and user added to it
2. The group can be granted Model Registry access via RoleBinding
3. Users in the group can access the Model Registry
"""
assert_positive_mr_registry(
model_registry_instance_rest_endpoint=model_registry_instance_rest_endpoint[0],
)
@pytest.mark.sanity
def test_add_single_user_role_binding(
self: Self,
is_byoidc: bool,
admin_client: DynamicClient,
test_idp_user: UserTestSession,
model_registry_instance_rest_endpoint: list[str],
user_credentials_rbac: dict[str, str],
created_role_binding_user: RoleBinding,
login_as_test_user: None,
):
"""
Test granting Model Registry access to a single user.
This test verifies that:
1. A single user can be granted Model Registry access via RoleBinding
2. The user can access the Model Registry after being granted access
"""
if is_byoidc:
# Create a copy to avoid mutating the shared fixture
creds_copy = user_credentials_rbac.copy()
creds_copy["username"] = "mr-non-admin"
sampler = TimeoutSampler(
wait_timeout=120,
sleep=5,
func=assert_positive_mr_registry,
model_registry_instance_rest_endpoint=model_registry_instance_rest_endpoint[0],
token=get_mr_user_token(admin_client=admin_client, user_credentials_rbac=creds_copy),
)
for _ in sampler:
break # Break after first successful iteration
LOGGER.info("Successfully accessed Model Registry")
else:
assert_positive_mr_registry(model_registry_instance_rest_endpoint=model_registry_instance_rest_endpoint[0])
class TestUserMultiProjectPermission:
"""
Test suite for verifying user permissions in a multi-project setup for the Model Registry.
"""
@pytest.mark.parametrize(
(
"db_secret_parametrized, "
"db_pvc_parametrized, "
"db_service_parametrized, "
"db_deployment_parametrized, "
"model_registry_instance_parametrized"
),
MR_MULTIPROJECT_TEST_SCENARIO_PARAMS,
indirect=True,
)
@pytest.mark.sanity
def test_user_permission_multi_project_parametrized(
self: Self,
is_byoidc: bool,
test_idp_user: UserTestSession,
admin_client: DynamicClient,
updated_dsc_component_state_scope_session: DataScienceCluster,
model_registry_namespace: str,
db_secret_parametrized: List[Secret],
db_pvc_parametrized: List[PersistentVolumeClaim],
db_service_parametrized: List[Service],
db_deployment_parametrized: List[Deployment],
user_credentials_rbac: dict[str, str],
model_registry_instance_parametrized: List[ModelRegistry],
login_as_test_user: None,
):
"""
Verify that a user can be granted access to one MR instance at a time.
All resources (MR instances and databases) are created in the same dynamically generated namespace.
"""
if len(model_registry_instance_parametrized) != NUM_MR_INSTANCES:
raise ValueError(
f"Expected {NUM_MR_INSTANCES} MR instances, but got {len(model_registry_instance_parametrized)}"
)
LOGGER.info(f"Model Registry namespace: {model_registry_namespace}")
# Prepare MR instances and endpoints
mr_data = []
for mr_instance in model_registry_instance_parametrized:
service = get_mr_service_by_label(
client=admin_client,
namespace_name=model_registry_namespace,
mr_instance=mr_instance,
)
endpoint = get_endpoint_from_mr_service(svc=service, protocol=Protocols.REST)
mr_data.append({"instance": mr_instance, "endpoint": endpoint, "name": mr_instance.name})
token = (
get_openshift_token()
if not is_byoidc
else (get_mr_user_token(admin_client=admin_client, user_credentials_rbac=user_credentials_rbac))
)
# Test each MR instance sequentially
for i, current_mr_data in enumerate(mr_data):
current_mr = current_mr_data["instance"]
current_endpoint = current_mr_data["endpoint"]
LOGGER.info(f"Testing access to MR instance {i + 1}/{len(mr_data)}: {current_mr.name}")
# Grant access to current instance
grant_mr_access(
admin_client=admin_client,
user=user_credentials_rbac["username"],
mr_instance_name=current_mr.name,
model_registry_namespace=model_registry_namespace,
)
# Verify access to current instance
sampler = TimeoutSampler(
wait_timeout=240,
sleep=5,
func=assert_positive_mr_registry,
model_registry_instance_rest_endpoint=current_endpoint,
token=token,
)
for _ in sampler:
break
# Verify NO access to other instances
other_mr_names = [mr["name"] for j, mr in enumerate(mr_data) if j != i]
for j, other_mr_data in enumerate(mr_data):
if i != j:
# Wait for role reconciliation - retry until ForbiddenException is raised
sampler = TimeoutSampler(
wait_timeout=360,
sleep=10,
func=assert_forbidden_access,
endpoint=other_mr_data["endpoint"],
token=token,
)
for _ in sampler:
break
LOGGER.info(f"User has access to {current_mr.name}, but not to: {', '.join(other_mr_names)}")
# Revoke access (except for the last instance)
if i < len(mr_data) - 1:
revoke_mr_access(
admin_client=admin_client,
user=user_credentials_rbac["username"],
mr_instance_name=current_mr.name,
model_registry_namespace=model_registry_namespace,
)
# Clean up - revoke access from the last instance
revoke_mr_access(
admin_client=admin_client,
user=user_credentials_rbac["username"],
mr_instance_name=mr_data[-1]["instance"].name,
model_registry_namespace=model_registry_namespace,
)