|
| 1 | +from typing import Generator, Any, Dict |
| 2 | + |
| 3 | +import pytest |
| 4 | +from kubernetes.dynamic import DynamicClient |
| 5 | +from _pytest.fixtures import FixtureRequest |
| 6 | +from pytest_testconfig import config as py_config |
| 7 | + |
| 8 | +from utilities.kueue_utils import ( |
| 9 | + create_local_queue, |
| 10 | + create_cluster_queue, |
| 11 | + create_resource_flavor, |
| 12 | + LocalQueue, |
| 13 | + ClusterQueue, |
| 14 | + ResourceFlavor, |
| 15 | +) |
| 16 | +from ocp_resources.namespace import Namespace |
| 17 | +from ocp_resources.config_map import ConfigMap |
| 18 | +from ocp_resources.deployment import Deployment |
| 19 | +from ocp_resources.persistent_volume_claim import PersistentVolumeClaim |
| 20 | +from ocp_resources.resource import ResourceEditor |
| 21 | +from utilities.constants import Labels, Annotations |
| 22 | +from utilities.infra import create_ns |
| 23 | +from utilities.kueue_detection import ( |
| 24 | + detect_kueue_installation_scenario, |
| 25 | + should_patch_kueue_config, |
| 26 | + should_restart_kueue_deployment, |
| 27 | +) |
| 28 | +import yaml |
| 29 | +import logging |
| 30 | + |
| 31 | +LOGGER = logging.getLogger(__name__) |
| 32 | + |
| 33 | + |
| 34 | +def kueue_resource_groups_for_notebooks( |
| 35 | + flavor_name: str, |
| 36 | + cpu_quota: str, |
| 37 | + memory_quota: str, |
| 38 | +) -> list[Dict[str, Any]]: |
| 39 | + """Create resource groups configuration for Kueue with notebook-specific resources""" |
| 40 | + return [ |
| 41 | + { |
| 42 | + "coveredResources": ["cpu", "memory"], |
| 43 | + "flavors": [ |
| 44 | + { |
| 45 | + "name": flavor_name, |
| 46 | + "resources": [ |
| 47 | + {"name": "cpu", "nominalQuota": cpu_quota}, |
| 48 | + {"name": "memory", "nominalQuota": memory_quota}, |
| 49 | + ], |
| 50 | + } |
| 51 | + ], |
| 52 | + } |
| 53 | + ] |
| 54 | + |
| 55 | + |
| 56 | +@pytest.fixture(scope="class") |
| 57 | +def kueue_notebook_resource_flavor( |
| 58 | + request: FixtureRequest, |
| 59 | + admin_client: DynamicClient, |
| 60 | +) -> Generator[ResourceFlavor, Any, Any]: |
| 61 | + """Create a ResourceFlavor for notebook workloads""" |
| 62 | + with create_resource_flavor( |
| 63 | + client=admin_client, |
| 64 | + name=request.param["name"], |
| 65 | + teardown=True, |
| 66 | + ) as resource_flavor: |
| 67 | + yield resource_flavor |
| 68 | + |
| 69 | + |
| 70 | +@pytest.fixture(scope="class") |
| 71 | +def kueue_notebook_cluster_queue( |
| 72 | + request: FixtureRequest, |
| 73 | + admin_client: DynamicClient, |
| 74 | +) -> Generator[ClusterQueue, Any, Any]: |
| 75 | + """Create a ClusterQueue for notebook workloads""" |
| 76 | + resource_groups = kueue_resource_groups_for_notebooks( |
| 77 | + flavor_name=request.param["resource_flavor_name"], |
| 78 | + cpu_quota=request.param["cpu_quota"], |
| 79 | + memory_quota=request.param["memory_quota"], |
| 80 | + ) |
| 81 | + |
| 82 | + with create_cluster_queue( |
| 83 | + client=admin_client, |
| 84 | + name=request.param["name"], |
| 85 | + resource_groups=resource_groups, |
| 86 | + namespace_selector=request.param.get("namespace_selector", {}), |
| 87 | + teardown=True, |
| 88 | + ) as cluster_queue: |
| 89 | + yield cluster_queue |
| 90 | + |
| 91 | + |
| 92 | +@pytest.fixture(scope="class") |
| 93 | +def kueue_notebook_local_queue( |
| 94 | + request: FixtureRequest, |
| 95 | + admin_client: DynamicClient, |
| 96 | + kueue_enabled_notebook_namespace: Namespace, |
| 97 | +) -> Generator[LocalQueue, Any, Any]: |
| 98 | + """Create a LocalQueue for notebook workloads""" |
| 99 | + with create_local_queue( |
| 100 | + client=admin_client, |
| 101 | + name=request.param["name"], |
| 102 | + cluster_queue=request.param["cluster_queue"], |
| 103 | + namespace=kueue_enabled_notebook_namespace.name, |
| 104 | + teardown=True, |
| 105 | + ) as local_queue: |
| 106 | + yield local_queue |
| 107 | + |
| 108 | + |
| 109 | +@pytest.fixture(scope="function") |
| 110 | +def kueue_notebook_persistent_volume_claim( |
| 111 | + request: FixtureRequest, |
| 112 | + kueue_enabled_notebook_namespace: Namespace, |
| 113 | + unprivileged_client: DynamicClient, |
| 114 | +) -> Generator[PersistentVolumeClaim, Any, Any]: |
| 115 | + """Create a PersistentVolumeClaim in the Kueue-enabled namespace""" |
| 116 | + with PersistentVolumeClaim( |
| 117 | + client=unprivileged_client, |
| 118 | + name=request.param["name"], |
| 119 | + namespace=kueue_enabled_notebook_namespace.name, |
| 120 | + label={Labels.OpenDataHub.DASHBOARD: "true"}, |
| 121 | + accessmodes=PersistentVolumeClaim.AccessMode.RWO, |
| 122 | + size="10Gi", |
| 123 | + volume_mode=PersistentVolumeClaim.VolumeMode.FILE, |
| 124 | + ) as pvc: |
| 125 | + yield pvc |
| 126 | + |
| 127 | + |
| 128 | +@pytest.fixture(scope="class") |
| 129 | +def kueue_enabled_notebook_namespace( |
| 130 | + request: FixtureRequest, |
| 131 | + admin_client: DynamicClient, |
| 132 | + unprivileged_client: DynamicClient, |
| 133 | +) -> Generator[Namespace, Any, Any]: |
| 134 | + """Create a namespace with Kueue label enabled for notebook workloads""" |
| 135 | + |
| 136 | + namespace_name = request.param["name"] |
| 137 | + add_kueue_label = request.param.get("add-kueue-label", True) |
| 138 | + |
| 139 | + with create_ns( |
| 140 | + admin_client=admin_client, |
| 141 | + name=namespace_name, |
| 142 | + unprivileged_client=unprivileged_client, |
| 143 | + add_kueue_label=add_kueue_label, |
| 144 | + pytest_request=request, |
| 145 | + ) as ns: |
| 146 | + yield ns |
| 147 | + |
| 148 | + |
| 149 | +@pytest.fixture(scope="class") |
| 150 | +def patched_kueue_manager_config( |
| 151 | + admin_client: DynamicClient, |
| 152 | + request: FixtureRequest, |
| 153 | +) -> Generator[ConfigMap | None, Any, Any]: |
| 154 | + """Conditionally patch the kueue-manager-config ConfigMap based on Kueue installation scenario |
| 155 | +
|
| 156 | + This fixture: |
| 157 | + 1. Detects the Kueue installation scenario |
| 158 | + 2. For RHOAI managed scenario: patches ConfigMap and restarts deployment |
| 159 | + 3. For Red Hat build of Kueue operator scenario: skips patching (no config/deployment present) |
| 160 | + 4. Yields the ConfigMap (or None if skipped) for tests |
| 161 | + 5. On teardown: restores original config if it was patched |
| 162 | + """ |
| 163 | + # Detect Kueue installation scenario |
| 164 | + scenario = detect_kueue_installation_scenario(client=admin_client) |
| 165 | + LOGGER.info(f"Detected Kueue installation scenario: {scenario}") |
| 166 | + |
| 167 | + # Check if we should patch config for this scenario |
| 168 | + if not should_patch_kueue_config(scenario): |
| 169 | + LOGGER.info(f"Skipping kueue-manager-config patching for scenario: {scenario}") |
| 170 | + yield None |
| 171 | + return |
| 172 | + |
| 173 | + namespace = py_config["applications_namespace"] |
| 174 | + config_map_name = "kueue-manager-config" |
| 175 | + |
| 176 | + # Get the existing ConfigMap |
| 177 | + try: |
| 178 | + config_map = ConfigMap( |
| 179 | + client=admin_client, |
| 180 | + name=config_map_name, |
| 181 | + namespace=namespace, |
| 182 | + ensure_exists=True, |
| 183 | + ) |
| 184 | + except Exception as e: |
| 185 | + LOGGER.warning(f"Could not find kueue-manager-config ConfigMap: {e}") |
| 186 | + LOGGER.info("This is expected for Red Hat build of Kueue operator scenario") |
| 187 | + yield None |
| 188 | + return |
| 189 | + |
| 190 | + # Store original data and annotations for restoration |
| 191 | + original_data = config_map.instance.data.copy() if config_map.instance.data else {} |
| 192 | + original_annotations = ( |
| 193 | + config_map.instance.metadata.annotations.copy() if config_map.instance.metadata.annotations else {} |
| 194 | + ) |
| 195 | + |
| 196 | + LOGGER.info("Storing original kueue-manager-config data for restoration") |
| 197 | + |
| 198 | + # Get current config data |
| 199 | + current_data = config_map.instance.data or {} |
| 200 | + config_yaml = current_data.get("controller_manager_config.yaml", "{}") |
| 201 | + |
| 202 | + # Parse the existing configuration |
| 203 | + try: |
| 204 | + config_dict = yaml.safe_load(config_yaml) or {} |
| 205 | + except yaml.YAMLError: |
| 206 | + config_dict = {} |
| 207 | + |
| 208 | + # Ensure integrations section exists |
| 209 | + if "integrations" not in config_dict: |
| 210 | + config_dict["integrations"] = {} |
| 211 | + |
| 212 | + if "frameworks" not in config_dict["integrations"]: |
| 213 | + config_dict["integrations"]["frameworks"] = [] |
| 214 | + |
| 215 | + # Add pod and statefulset if not already present |
| 216 | + frameworks = config_dict["integrations"]["frameworks"] |
| 217 | + if "pod" not in frameworks: |
| 218 | + frameworks.append("pod") |
| 219 | + if "statefulset" not in frameworks: |
| 220 | + frameworks.append("statefulset") |
| 221 | + |
| 222 | + # Convert back to YAML |
| 223 | + updated_config_yaml = yaml.dump(config_dict, default_flow_style=False) |
| 224 | + updated_data = {**current_data, "controller_manager_config.yaml": updated_config_yaml} |
| 225 | + |
| 226 | + # Apply the patch with both data and metadata annotations |
| 227 | + patch = {"metadata": {"annotations": {Annotations.OpenDataHubIo.MANAGED: "false"}}, "data": updated_data} |
| 228 | + |
| 229 | + def restart_kueue_deployment(reason: str): |
| 230 | + """Helper function to restart the kueue-controller-manager deployment""" |
| 231 | + if not should_restart_kueue_deployment(scenario): |
| 232 | + LOGGER.info(f"Skipping kueue-controller-manager deployment restart for scenario: {scenario}") |
| 233 | + return |
| 234 | + |
| 235 | + LOGGER.info(f"Restarting kueue-controller-manager deployment - {reason}") |
| 236 | + |
| 237 | + try: |
| 238 | + kueue_deployment = Deployment( |
| 239 | + client=admin_client, |
| 240 | + name="kueue-controller-manager", |
| 241 | + namespace=namespace, |
| 242 | + ensure_exists=True, |
| 243 | + ) |
| 244 | + |
| 245 | + # Get current replica count before restart |
| 246 | + current_replicas = kueue_deployment.replicas |
| 247 | + if current_replicas is None: |
| 248 | + current_replicas = 1 |
| 249 | + LOGGER.info(f"Current kueue-controller-manager replicas: {current_replicas}") |
| 250 | + |
| 251 | + # Restart deployment by scaling to 0 and back to original count |
| 252 | + LOGGER.info("Scaling kueue-controller-manager deployment to 0 replicas...") |
| 253 | + kueue_deployment.scale_replicas(replica_count=0) |
| 254 | + kueue_deployment.wait_for_replicas(deployed=False) |
| 255 | + LOGGER.info("kueue-controller-manager deployment scaled down to 0 replicas") |
| 256 | + |
| 257 | + # Now scale back up to original count |
| 258 | + LOGGER.info(f"Scaling kueue-controller-manager deployment back to {current_replicas} replicas...") |
| 259 | + kueue_deployment.scale_replicas(replica_count=current_replicas) |
| 260 | + kueue_deployment.wait_for_replicas(deployed=True) |
| 261 | + |
| 262 | + LOGGER.info(f"kueue-controller-manager deployment restart completed - {reason}") |
| 263 | + except Exception as e: |
| 264 | + LOGGER.warning(f"Could not restart kueue-controller-manager deployment: {e}") |
| 265 | + LOGGER.info("This is expected for Red Hat build of Kueue operator scenario") |
| 266 | + |
| 267 | + with ResourceEditor(patches={config_map: patch}): |
| 268 | + # After patching the ConfigMap, restart the deployment to pick up new configuration |
| 269 | + restart_kueue_deployment(reason="to apply patched configuration") |
| 270 | + yield config_map |
| 271 | + |
| 272 | + # Teardown: Restore original configuration and restart deployment |
| 273 | + LOGGER.info("Restoring original kueue-manager-config configuration") |
| 274 | + |
| 275 | + # Restore original data and annotations |
| 276 | + restore_patch = {"metadata": {"annotations": original_annotations}, "data": original_data} |
| 277 | + |
| 278 | + with ResourceEditor(patches={config_map: restore_patch}): |
| 279 | + # Restart deployment to pick up the restored original configuration |
| 280 | + restart_kueue_deployment(reason="to restore original configuration") |
| 281 | + LOGGER.info("Original kueue-manager-config configuration restored successfully") |
0 commit comments