|
12 | 12 | from ocp_resources.deployment import Deployment |
13 | 13 |
|
14 | 14 | from ocp_resources.model_registry import ModelRegistry |
| 15 | +import schemathesis.schemas |
| 16 | +from schemathesis.specs.openapi.schemas import BaseOpenAPISchema |
| 17 | +from schemathesis.generation.stateful.state_machine import APIStateMachine |
| 18 | +from schemathesis.core.transport import Response |
| 19 | +from schemathesis.generation.case import Case |
15 | 20 | from ocp_resources.resource import ResourceEditor |
16 | 21 |
|
17 | 22 | from pytest import FixtureRequest |
|
44 | 49 |
|
45 | 50 | @pytest.fixture(scope="class") |
46 | 51 | def model_registry_namespace(request: FixtureRequest, admin_client: DynamicClient) -> Generator[Namespace, Any, Any]: |
| 52 | + # TODO: model_registry_namespace fixture should basically be doing this 1) check the ns exists, 2) it is in ACTIVE |
| 53 | + # state and return. But it should not create the ns. If DSC manages registriesNamespace, and namespace was not |
| 54 | + # created when mr is updated to Managed state, it would be a bug and we should catch it |
| 55 | + # To be handled in upcoming PR. |
47 | 56 | with create_ns( |
48 | 57 | name=request.param.get("namespace_name", MR_NAMESPACE), |
49 | 58 | admin_client=admin_client, |
@@ -301,11 +310,32 @@ def model_registry_instance_rest_endpoint( |
301 | 310 |
|
302 | 311 |
|
303 | 312 | @pytest.fixture(scope="class") |
304 | | -def generated_schema(model_registry_instance_rest_endpoint: str) -> Any: |
305 | | - return schemathesis.from_uri( |
306 | | - uri="https://raw.githubusercontent.com/kubeflow/model-registry/main/api/openapi/model-registry.yaml", |
307 | | - base_url=f"https://{model_registry_instance_rest_endpoint}/", |
| 313 | +def generated_schema(model_registry_instance_rest_endpoint: str) -> BaseOpenAPISchema: |
| 314 | + schema = schemathesis.openapi.from_url( |
| 315 | + url="https://raw.githubusercontent.com/kubeflow/model-registry/main/api/openapi/model-registry.yaml" |
308 | 316 | ) |
| 317 | + schema.configure(base_url=f"https://{model_registry_instance_rest_endpoint}/") |
| 318 | + return schema |
| 319 | + |
| 320 | + |
| 321 | +@pytest.fixture |
| 322 | +def state_machine(generated_schema: BaseOpenAPISchema, current_client_token: str) -> APIStateMachine: |
| 323 | + BaseAPIWorkflow = generated_schema.as_state_machine() |
| 324 | + |
| 325 | + class APIWorkflow(BaseAPIWorkflow): # type: ignore |
| 326 | + headers: dict[str, str] |
| 327 | + |
| 328 | + def setup(self) -> None: |
| 329 | + self.headers = {"Authorization": f"Bearer {current_client_token}", "Content-Type": "application/json"} |
| 330 | + |
| 331 | + # these kwargs are passed to requests.request() |
| 332 | + def get_call_kwargs(self, case: Case) -> dict[str, Any]: |
| 333 | + return {"verify": False, "headers": self.headers} |
| 334 | + |
| 335 | + def after_call(self, response: Response, case: Case) -> None: |
| 336 | + LOGGER.info(f"{case.method} {case.path} -> {response.status_code}") |
| 337 | + |
| 338 | + return APIWorkflow |
309 | 339 |
|
310 | 340 |
|
311 | 341 | @pytest.fixture(scope="class") |
|
0 commit comments