From 4d1c5a0c08c67855d2cf70ed94a122d806a65832 Mon Sep 17 00:00:00 2001 From: Safoine El khabich Date: Fri, 21 Nov 2025 02:34:27 +0100 Subject: [PATCH 1/8] init --- .../component-guide/deployers/kubernetes.md | 22 +- src/zenml/cli/utils.py | 40 ++- src/zenml/enums.py | 11 + .../deployers/kubernetes_deployer.py | 321 ++++++++++++++++-- .../flavors/kubernetes_deployer_flavor.py | 13 +- .../integrations/kubernetes/kube_utils.py | 64 ++++ 6 files changed, 418 insertions(+), 53 deletions(-) diff --git a/docs/book/component-guide/deployers/kubernetes.md b/docs/book/component-guide/deployers/kubernetes.md index d856954e407..ba4547834b8 100644 --- a/docs/book/component-guide/deployers/kubernetes.md +++ b/docs/book/component-guide/deployers/kubernetes.md @@ -150,7 +150,7 @@ def greet(name: str) -> str: settings = { "deployer": KubernetesDeployerSettings( namespace="my-namespace", - service_type="LoadBalancer", # or "NodePort", "ClusterIP" + service_type="LoadBalancer", service_port=8000, ), "resources": ResourceSettings( @@ -174,11 +174,11 @@ For production deployments, add health probes, labels, and resource limits: settings = { "deployer": KubernetesDeployerSettings( namespace="production", - service_type="ClusterIP", # Use with Ingress + service_type="ClusterIP", service_port=8000, + url_preference="ingress", - # Labels for organization labels={ "environment": "production", "team": "ml-platform", @@ -517,12 +517,27 @@ For a complete list of all available settings, see the `KubernetesDeployerSettin * `strict_additional_resources`: If `True`, fail deployment if any additional resource fails (default: `True`) * `custom_templates_dir`: Path to directory with custom Jinja2 templates +**URL Selection**: +* `url_preference`: Which URL type to return when multiple are available. Defaults to `auto`, which follows the configured `service_type` priority (`LoadBalancer` → `NodePort` → `ClusterIP`). Set to `ingress` or `gateway_api` when you want those URLs returned; the deployer will fail if the requested URL type cannot be resolved. + **Internal Settings**: * `wait_for_load_balancer_timeout`: Timeout for LoadBalancer IP assignment (default: `150` seconds, `0` to skip) * `deployment_ready_check_interval`: Interval between readiness checks (default: `2` seconds) Check out [this docs page](https://docs.zenml.io/concepts/steps_and_pipelines/configuration) for more information on how to specify settings. +### How deployment URLs are chosen + +ZenML stores all discovered URLs (Gateway API, Ingress, LoadBalancer, NodePort, ClusterIP) in deployment metadata, but only returns one URL based on `url_preference`: + +- `auto` (default) mirrors your `service_type` choice: LoadBalancer → NodePort → ClusterIP. Gateway/Ingress URLs are ignored unless explicitly requested. +- Set `url_preference="ingress"` or `url_preference="gateway_api"` when you add those resources (e.g., via `additional_resources`). If the requested URL type cannot be resolved, the deployment state call fails instead of silently falling back. +- Set `url_preference="load_balancer"` when you want to strictly require an external IP; the deployer will error if the LoadBalancer is still pending. + +Tip: When using Ingress or Gateway API, combine `service_type="ClusterIP"` with the matching `url_preference` so the returned URL matches the routing layer you manage. + +- Strict preference behavior: If you set `url_preference` to a specific type and that URL can't be discovered (for example, LoadBalancer is still pending), the deployer raises an error instead of returning another URL type. This helps avoid accidental exposure paths. + ## Troubleshooting @@ -592,4 +607,3 @@ If pods can't pull the container image: 6. **Use HPA** for autoscaling based on actual load 7. **Configure PodDisruptionBudget** for high availability during cluster updates 8. **Keep additional resources in version control** alongside your pipeline code - diff --git a/src/zenml/cli/utils.py b/src/zenml/cli/utils.py index 962c470e0a2..695cfd92d26 100644 --- a/src/zenml/cli/utils.py +++ b/src/zenml/cli/utils.py @@ -2517,10 +2517,9 @@ def pretty_print_deployment( if deployment.url: declare("\n[bold]Connection information:[/bold]") - declare(f"\n[bold]Endpoint URL:[/bold] [link]{deployment.url}[/link]") - declare( - f"[bold]Swagger URL:[/bold] [link]{deployment.url.rstrip('/')}/docs[/link]" - ) + endpoint_url = deployment.url.rstrip("/") + declare(f"\n[bold]Endpoint URL:[/bold] [link]{endpoint_url}[/link]") + declare(f"[bold]Swagger URL:[/bold] [link]{endpoint_url}/docs[/link]") # Auth key handling with proper security auth_key = deployment.auth_key @@ -2557,27 +2556,34 @@ def pretty_print_deployment( # cURL example declare("\n[bold]cURL example:[/bold]") - curl_headers = [] - if auth_key: - if show_secret: - curl_headers.append(f'-H "Authorization: Bearer {auth_key}"') - else: - curl_headers.append( - '-H "Authorization: Bearer "' - ) + if not deployment.url: + console.print("No endpoint URL available.") + else: + base_url = deployment.url.rstrip("/") + + curl_headers = [] + if auth_key: + if show_secret: + curl_headers.append( + f'-H "Authorization: Bearer {auth_key}"' + ) + else: + curl_headers.append( + '-H "Authorization: Bearer "' + ) - curl_params = json.dumps(example, indent=2).replace("\n", "\n ") + curl_params = json.dumps(example, indent=2).replace("\n", "\n ") - curl_headers.append('-H "Content-Type: application/json"') - headers_str = "\\\n ".join(curl_headers) + curl_headers.append('-H "Content-Type: application/json"') + headers_str = "\\\n ".join(curl_headers) - curl_command = f"""curl -X POST {deployment.url}/invoke \\ + curl_command = f"""curl -X POST {base_url}/invoke \\ {headers_str} \\ -d '{{ "parameters": {curl_params} }}'""" - console.print(curl_command) + console.print(curl_command) # JSON Schemas if show_schema: diff --git a/src/zenml/enums.py b/src/zenml/enums.py index 5c2b8d3fcd2..b5505e6a7f5 100644 --- a/src/zenml/enums.py +++ b/src/zenml/enums.py @@ -555,6 +555,17 @@ class KubernetesServiceType(StrEnum): CLUSTER_IP = "ClusterIP" +class KubernetesUrlPreference(StrEnum): + """URL preference for Kubernetes deployer when multiple URL types are available.""" + + GATEWAY_API = "gateway_api" + INGRESS = "ingress" + LOAD_BALANCER = "load_balancer" + NODE_PORT = "node_port" + CLUSTER_IP = "cluster_ip" + AUTO = "auto" + + class StepRuntime(StrEnum): """All possible runtime modes for a step.""" diff --git a/src/zenml/integrations/kubernetes/deployers/kubernetes_deployer.py b/src/zenml/integrations/kubernetes/deployers/kubernetes_deployer.py index 0ab88e6dd88..46dc8550658 100644 --- a/src/zenml/integrations/kubernetes/deployers/kubernetes_deployer.py +++ b/src/zenml/integrations/kubernetes/deployers/kubernetes_deployer.py @@ -46,7 +46,12 @@ from zenml.entrypoints.base_entrypoint_configuration import ( SNAPSHOT_ID_OPTION, ) -from zenml.enums import DeploymentStatus, StackComponentType +from zenml.enums import ( + DeploymentStatus, + KubernetesServiceType, + KubernetesUrlPreference, + StackComponentType, +) from zenml.integrations.kubernetes import kube_utils from zenml.integrations.kubernetes.flavors.kubernetes_deployer_flavor import ( KubernetesDeployerConfig, @@ -1041,23 +1046,17 @@ def _get_deployment_state_from_inventory( DeploymentNotFoundError: If no deployment resources found in cluster. DeployerError: If an error occurs checking resources. """ - deployment_items = [ - item - for item in inventory - if item.kind == "Deployment" and item.api_version == "apps/v1" - ] - - service_items = [ - item - for item in inventory - if item.kind == "Service" and item.api_version == "v1" - ] - try: status = DeploymentStatus.PENDING all_ready = True any_exists = False + deployment_items = [ + item + for item in inventory + if item.kind == "Deployment" and item.api_version == "apps/v1" + ] + for deployment_item in deployment_items: k8s_deployment = self.k8s_applier.get_resource( name=deployment_item.name, @@ -1102,31 +1101,29 @@ def _get_deployment_state_from_inventory( if all_ready: status = DeploymentStatus.RUNNING - url = None namespace = settings.namespace - for service_item in service_items: - k8s_service = self.k8s_applier.get_resource( - name=service_item.name, - namespace=service_item.namespace, - kind="Service", - api_version="v1", - ) + discovered_urls = self._discover_urls( + inventory=inventory, + namespace=namespace, + ) + url = self._select_url( + discovered_urls=discovered_urls, + settings=settings, + deployment_name=deployment_items[0].name + if deployment_items + else "unknown", + ) - if k8s_service: - service_url = kube_utils.build_service_url( - core_api=self.k8s_core_api, - service=k8s_service, - namespace=service_item.namespace or namespace, - ingress=None, - ) - if service_url: - url = service_url - break + urls_metadata = { + k: v for k, v in discovered_urls.items() if v is not None + } metadata = { "namespace": namespace, "labels": settings.labels, + "urls": urls_metadata, + "url_preference": settings.url_preference, } return DeploymentOperationalState( @@ -1142,6 +1139,268 @@ def _get_deployment_state_from_inventory( ) raise DeployerError(f"Failed to get deployment state: {e}") + def _discover_urls( + self, + inventory: List[ResourceInventoryItem], + namespace: str, + ) -> Dict[str, Optional[str]]: + """Discover all reachable URLs from Kubernetes resources. + + Args: + inventory: The resource inventory to check. + namespace: The namespace to check. + + Returns: + A dictionary of URLs, keyed by the URL type. + """ + discovered_urls: Dict[str, Optional[str]] = { + "gateway_api": None, + "ingress": None, + "load_balancer": None, + "node_port": None, + "cluster_ip": None, + } + + service_items = [ + item + for item in inventory + if item.kind == "Service" and item.api_version == "v1" + ] + ingress_items = [ + item + for item in inventory + if item.kind == "Ingress" + and item.api_version == "networking.k8s.io/v1" + ] + gateway_items = [ + item + for item in inventory + if item.kind == "Gateway" + and item.api_version == "gateway.networking.k8s.io/v1beta1" + ] + httproute_items = [ + item + for item in inventory + if item.kind == "HTTPRoute" + and item.api_version == "gateway.networking.k8s.io/v1beta1" + ] + + for service_item in service_items: + k8s_service = self.k8s_applier.get_resource( + name=service_item.name, + namespace=service_item.namespace, + kind="Service", + api_version="v1", + ) + + if not k8s_service: + continue + + service_namespace = service_item.namespace or namespace + + if not discovered_urls["gateway_api"]: + for httproute_item in httproute_items: + httproute_namespace = httproute_item.namespace or namespace + if httproute_namespace != service_namespace: + continue + + k8s_httproute = self.k8s_applier.get_resource( + name=httproute_item.name, + namespace=httproute_namespace, + kind="HTTPRoute", + api_version="gateway.networking.k8s.io/v1beta1", + ) + + if not k8s_httproute: + continue + + httproute_dict = normalize_resource_to_dict(k8s_httproute) + httproute_spec = httproute_dict.get("spec", {}) + httproute_rules = httproute_spec.get("rules", []) + + routes_to_service = False + for rule in httproute_rules: + backend_refs = rule.get("backendRefs", []) + for backend_ref in backend_refs: + backend_service_name = backend_ref.get("name") + backend_namespace = backend_ref.get("namespace") + if backend_namespace: + backend_namespace = ( + backend_namespace or httproute_namespace + ) + else: + backend_namespace = httproute_namespace + + if ( + backend_service_name == service_item.name + and backend_namespace == service_namespace + ): + routes_to_service = True + break + if routes_to_service: + break + + if not routes_to_service: + continue + + parent_refs = httproute_spec.get("parentRefs", []) + if not parent_refs: + continue + + parent_ref = parent_refs[0] + gateway_name = parent_ref.get("name") + gateway_namespace = ( + parent_ref.get("namespace") or namespace + ) + + matching_gateway = None + for gateway_item in gateway_items: + gateway_item_namespace = ( + gateway_item.namespace or namespace + ) + if ( + gateway_item.name == gateway_name + and gateway_item_namespace == gateway_namespace + ): + matching_gateway = self.k8s_applier.get_resource( + name=gateway_item.name, + namespace=gateway_item_namespace, + kind="Gateway", + api_version="gateway.networking.k8s.io/v1beta1", + ) + break + + if matching_gateway: + gateway_api_url = kube_utils.build_gateway_api_url( + gateway=matching_gateway, + httproute=k8s_httproute, + ) + if gateway_api_url: + discovered_urls["gateway_api"] = gateway_api_url + + matching_ingress = None + if not discovered_urls["ingress"]: + for ingress_item in ingress_items: + ingress_namespace = ingress_item.namespace or namespace + if ingress_namespace != service_namespace: + continue + + k8s_ingress = self.k8s_applier.get_resource( + name=ingress_item.name, + namespace=ingress_namespace, + kind="Ingress", + api_version="networking.k8s.io/v1", + ) + + if k8s_ingress: + ingress_dict = normalize_resource_to_dict(k8s_ingress) + ingress_spec = ingress_dict.get("spec", {}) + rules = ingress_spec.get("rules", []) + for rule in rules: + http_config = rule.get("http", {}) + paths = http_config.get("paths", []) + for path_config in paths: + backend = path_config.get("backend", {}) + service_backend = backend.get("service", {}) + backend_service_name = service_backend.get( + "name" + ) + if backend_service_name == service_item.name: + matching_ingress = k8s_ingress + break + if matching_ingress: + break + if matching_ingress: + break + + if matching_ingress and not discovered_urls["ingress"]: + ingress_url = kube_utils.build_service_url( + core_api=self.k8s_core_api, + service=k8s_service, + namespace=service_item.namespace or namespace, + ingress=matching_ingress, + ) + if ingress_url: + discovered_urls["ingress"] = ingress_url + + service_url = kube_utils.build_service_url( + core_api=self.k8s_core_api, + service=k8s_service, + namespace=service_item.namespace or namespace, + ingress=None, + ) + + if service_url: + service_dict = normalize_resource_to_dict(k8s_service) + service_type = service_dict.get("spec", {}).get( + "type", "ClusterIP" + ) + + if service_type == "LoadBalancer": + discovered_urls["load_balancer"] = service_url + elif service_type == "NodePort": + discovered_urls["node_port"] = service_url + elif service_type == "ClusterIP": + discovered_urls["cluster_ip"] = service_url + + return discovered_urls + + def _select_url( + self, + discovered_urls: Dict[str, Optional[str]], + settings: KubernetesDeployerSettings, + deployment_name: str, + ) -> Optional[str]: + """Pick the URL according to preference. + + Args: + discovered_urls: The URLs to choose from. + settings: The settings to use. + deployment_name: The name of the deployment. + + Returns: + The selected URL. + + Raises: + DeployerError: If the URL preference is not found. + """ + preference = settings.url_preference + + if preference == KubernetesUrlPreference.AUTO: + priority_order: List[str] = [] + if settings.service_type == KubernetesServiceType.LOAD_BALANCER: + priority_order = [ + KubernetesUrlPreference.LOAD_BALANCER.value, + KubernetesUrlPreference.NODE_PORT.value, + KubernetesUrlPreference.CLUSTER_IP.value, + ] + elif settings.service_type == KubernetesServiceType.NODE_PORT: + priority_order = [ + KubernetesUrlPreference.NODE_PORT.value, + KubernetesUrlPreference.CLUSTER_IP.value, + KubernetesUrlPreference.LOAD_BALANCER.value, + ] + else: + priority_order = [ + KubernetesUrlPreference.CLUSTER_IP.value, + KubernetesUrlPreference.NODE_PORT.value, + KubernetesUrlPreference.LOAD_BALANCER.value, + ] + + for url_type in priority_order: + if discovered_urls.get(url_type): + return discovered_urls[url_type] + return None + + url = discovered_urls.get(preference.value) + if not url: + raise DeployerError( + f"URL preference '{preference.value}' requested but no matching URL " + f"was discovered for deployment '{deployment_name}'." + ) + + return url + def do_get_deployment_state( self, deployment: DeploymentResponse ) -> DeploymentOperationalState: diff --git a/src/zenml/integrations/kubernetes/flavors/kubernetes_deployer_flavor.py b/src/zenml/integrations/kubernetes/flavors/kubernetes_deployer_flavor.py index f0f08789e83..76b3d89cdec 100644 --- a/src/zenml/integrations/kubernetes/flavors/kubernetes_deployer_flavor.py +++ b/src/zenml/integrations/kubernetes/flavors/kubernetes_deployer_flavor.py @@ -23,7 +23,7 @@ BaseDeployerFlavor, BaseDeployerSettings, ) -from zenml.enums import KubernetesServiceType +from zenml.enums import KubernetesServiceType, KubernetesUrlPreference from zenml.integrations.kubernetes import ( KUBERNETES_DEPLOYER_FLAVOR, ) @@ -248,6 +248,17 @@ class KubernetesDeployerSettings(BaseDeployerSettings): description="Interval in seconds between deployment readiness checks.", ) + url_preference: KubernetesUrlPreference = Field( + default=KubernetesUrlPreference.AUTO, + description=( + "Which URL type to prefer when multiple are available. " + "Options: 'auto' (default priority follows service_type: LoadBalancer " + "→ NodePort → ClusterIP), 'gateway_api', 'ingress', 'load_balancer', " + "'node_port', 'cluster_ip'. Gateway/Ingress URLs are only used when explicitly " + "requested via this setting. All discovered URLs are stored in metadata.urls." + ), + ) + @field_validator("readiness_probe_path", "liveness_probe_path") @classmethod def validate_probe_path(cls, v: str) -> str: diff --git a/src/zenml/integrations/kubernetes/kube_utils.py b/src/zenml/integrations/kubernetes/kube_utils.py index d75169197ae..5e1452010e1 100644 --- a/src/zenml/integrations/kubernetes/kube_utils.py +++ b/src/zenml/integrations/kubernetes/kube_utils.py @@ -1355,3 +1355,67 @@ def build_service_url( return f"http://{service_name}.{namespace}.svc.cluster.local:{service_port}" return None + + +def build_gateway_api_url( + gateway: Union[Dict[str, Any], Any], + httproute: Union[Dict[str, Any], Any], +) -> Optional[str]: + """Build URL from Gateway API Gateway and HTTPRoute resources. + + Gateway API is the newer Kubernetes standard for ingress/routing. + It uses Gateway (entry point) + HTTPRoute (routing rules) instead of Ingress. + + Args: + gateway: Gateway resource (gateway.networking.k8s.io/v1beta1). + httproute: HTTPRoute resource (gateway.networking.k8s.io/v1beta1). + + Returns: + Service URL from Gateway API, or None if URL cannot be determined. + """ + gateway_dict = normalize_resource_to_dict(gateway) + httproute_dict = normalize_resource_to_dict(httproute) + + gateway_status = gateway_dict.get("status", {}) + gateway_listeners = gateway_status.get("listeners", []) + + httproute_spec = httproute_dict.get("spec", {}) + httproute_hostnames = httproute_spec.get("hostnames", []) + httproute_rules = httproute_spec.get("rules", []) + + if not httproute_hostnames or not httproute_rules: + return None + + hostname = httproute_hostnames[0] + + protocol = "http" + parent_refs = httproute_spec.get("parentRefs", []) + if parent_refs: + parent_ref = parent_refs[0] + listener_name = parent_ref.get("name") + section_name = parent_ref.get("sectionName") + + for listener in gateway_listeners: + listener_name_match = listener.get("name") == listener_name + if section_name: + listener_name_match = listener_name_match and ( + listener.get("name") == section_name + ) + + if listener_name_match: + listener_protocol = listener.get("protocol", "").lower() + if listener_protocol in ("https", "tls"): + protocol = "https" + break + + path = "/" + if httproute_rules: + first_rule = httproute_rules[0] + matches = first_rule.get("matches", []) + if matches: + first_match = matches[0] + match_path = first_match.get("path") + if match_path: + path = match_path.get("value", "/") + + return f"{protocol}://{hostname}{path}" From a71c6db5b12fc7bf134bc4c693ad303d6fd8e0ba Mon Sep 17 00:00:00 2001 From: Safoine El khabich Date: Fri, 21 Nov 2025 10:24:02 +0100 Subject: [PATCH 2/8] apply claude review --- .../component-guide/deployers/kubernetes.md | 4 +- .../deployers/kubernetes_deployer.py | 321 ++++++++++-------- .../integrations/kubernetes/kube_utils.py | 24 +- .../integrations/kubernetes/conftest.py | 14 + .../test_kubernetes_deployer_urls.py | 166 +++++++++ .../kubernetes/test_kube_utils.py | 50 +++ 6 files changed, 432 insertions(+), 147 deletions(-) create mode 100644 tests/integration/integrations/kubernetes/conftest.py create mode 100644 tests/integration/integrations/kubernetes/deployers/test_kubernetes_deployer_urls.py create mode 100644 tests/integration/integrations/kubernetes/test_kube_utils.py diff --git a/docs/book/component-guide/deployers/kubernetes.md b/docs/book/component-guide/deployers/kubernetes.md index ba4547834b8..58d1f8f4701 100644 --- a/docs/book/component-guide/deployers/kubernetes.md +++ b/docs/book/component-guide/deployers/kubernetes.md @@ -531,10 +531,10 @@ Check out [this docs page](https://docs.zenml.io/concepts/steps_and_pipelines/co ZenML stores all discovered URLs (Gateway API, Ingress, LoadBalancer, NodePort, ClusterIP) in deployment metadata, but only returns one URL based on `url_preference`: - `auto` (default) mirrors your `service_type` choice: LoadBalancer → NodePort → ClusterIP. Gateway/Ingress URLs are ignored unless explicitly requested. -- Set `url_preference="ingress"` or `url_preference="gateway_api"` when you add those resources (e.g., via `additional_resources`). If the requested URL type cannot be resolved, the deployment state call fails instead of silently falling back. +- Set `url_preference="ingress"` or `url_preference="gateway_api"` when you add those resources (e.g., via `additional_resources`). If the requested URL type cannot be resolved during state retrieval (`zenml deployment describe`, dashboard/API refresh), the call fails instead of silently falling back. - Set `url_preference="load_balancer"` when you want to strictly require an external IP; the deployer will error if the LoadBalancer is still pending. -Tip: When using Ingress or Gateway API, combine `service_type="ClusterIP"` with the matching `url_preference` so the returned URL matches the routing layer you manage. +Tip: When using Ingress or Gateway API, combine `service_type="ClusterIP"` with the matching `url_preference` so the returned URL matches the routing layer you manage (and you avoid paying for an extra LoadBalancer). - Strict preference behavior: If you set `url_preference` to a specific type and that URL can't be discovered (for example, LoadBalancer is still pending), the deployer raises an error instead of returning another URL type. This helps avoid accidental exposure paths. diff --git a/src/zenml/integrations/kubernetes/deployers/kubernetes_deployer.py b/src/zenml/integrations/kubernetes/deployers/kubernetes_deployer.py index 46dc8550658..5a55c5ee558 100644 --- a/src/zenml/integrations/kubernetes/deployers/kubernetes_deployer.py +++ b/src/zenml/integrations/kubernetes/deployers/kubernetes_deployer.py @@ -1145,13 +1145,13 @@ def _discover_urls( namespace: str, ) -> Dict[str, Optional[str]]: """Discover all reachable URLs from Kubernetes resources. - + Args: inventory: The resource inventory to check. namespace: The namespace to check. Returns: - A dictionary of URLs, keyed by the URL type. + Dictionary mapping URL type names to discovered URLs (or None). """ discovered_urls: Dict[str, Optional[str]] = { "gateway_api": None, @@ -1199,151 +1199,200 @@ def _discover_urls( service_namespace = service_item.namespace or namespace if not discovered_urls["gateway_api"]: - for httproute_item in httproute_items: - httproute_namespace = httproute_item.namespace or namespace - if httproute_namespace != service_namespace: - continue - - k8s_httproute = self.k8s_applier.get_resource( - name=httproute_item.name, - namespace=httproute_namespace, - kind="HTTPRoute", - api_version="gateway.networking.k8s.io/v1beta1", + discovered_urls["gateway_api"] = ( + self._discover_gateway_api_url( + service_item=service_item, + service_namespace=service_namespace, + gateway_items=gateway_items, + httproute_items=httproute_items, + namespace=namespace, ) + ) - if not k8s_httproute: - continue + if not discovered_urls["ingress"]: + discovered_urls["ingress"] = self._discover_ingress_url( + service_item=service_item, + service_namespace=service_namespace, + ingress_items=ingress_items, + namespace=namespace, + k8s_service=k8s_service, + ) - httproute_dict = normalize_resource_to_dict(k8s_httproute) - httproute_spec = httproute_dict.get("spec", {}) - httproute_rules = httproute_spec.get("rules", []) - - routes_to_service = False - for rule in httproute_rules: - backend_refs = rule.get("backendRefs", []) - for backend_ref in backend_refs: - backend_service_name = backend_ref.get("name") - backend_namespace = backend_ref.get("namespace") - if backend_namespace: - backend_namespace = ( - backend_namespace or httproute_namespace - ) - else: - backend_namespace = httproute_namespace - - if ( - backend_service_name == service_item.name - and backend_namespace == service_namespace - ): - routes_to_service = True - break - if routes_to_service: - break - - if not routes_to_service: - continue + self._discover_service_urls( + service_item=service_item, + namespace=namespace, + k8s_service=k8s_service, + discovered_urls=discovered_urls, + ) - parent_refs = httproute_spec.get("parentRefs", []) - if not parent_refs: - continue + return discovered_urls - parent_ref = parent_refs[0] - gateway_name = parent_ref.get("name") - gateway_namespace = ( - parent_ref.get("namespace") or namespace - ) + def _discover_gateway_api_url( + self, + service_item: ResourceInventoryItem, + service_namespace: str, + gateway_items: List[ResourceInventoryItem], + httproute_items: List[ResourceInventoryItem], + namespace: str, + ) -> Optional[str]: + """Discover Gateway API URL for a service.""" + for httproute_item in httproute_items: + httproute_namespace = httproute_item.namespace or namespace + if httproute_namespace != service_namespace: + continue - matching_gateway = None - for gateway_item in gateway_items: - gateway_item_namespace = ( - gateway_item.namespace or namespace - ) - if ( - gateway_item.name == gateway_name - and gateway_item_namespace == gateway_namespace - ): - matching_gateway = self.k8s_applier.get_resource( - name=gateway_item.name, - namespace=gateway_item_namespace, - kind="Gateway", - api_version="gateway.networking.k8s.io/v1beta1", - ) - break - - if matching_gateway: - gateway_api_url = kube_utils.build_gateway_api_url( - gateway=matching_gateway, - httproute=k8s_httproute, + k8s_httproute = self.k8s_applier.get_resource( + name=httproute_item.name, + namespace=httproute_namespace, + kind="HTTPRoute", + api_version="gateway.networking.k8s.io/v1beta1", + ) + + if not k8s_httproute: + continue + + httproute_dict = normalize_resource_to_dict(k8s_httproute) + httproute_spec = httproute_dict.get("spec", {}) + httproute_rules = httproute_spec.get("rules", []) + + routes_to_service = False + for rule in httproute_rules: + backend_refs = rule.get("backendRefs", []) + for backend_ref in backend_refs: + backend_service_name = backend_ref.get("name") + backend_namespace = backend_ref.get("namespace") + if backend_namespace: + backend_namespace = ( + backend_namespace or httproute_namespace ) - if gateway_api_url: - discovered_urls["gateway_api"] = gateway_api_url + else: + backend_namespace = httproute_namespace + + if ( + backend_service_name == service_item.name + and backend_namespace == service_namespace + ): + routes_to_service = True + break + if routes_to_service: + break + + if not routes_to_service: + continue - matching_ingress = None - if not discovered_urls["ingress"]: - for ingress_item in ingress_items: - ingress_namespace = ingress_item.namespace or namespace - if ingress_namespace != service_namespace: - continue + parent_refs = httproute_spec.get("parentRefs", []) + if not parent_refs: + continue - k8s_ingress = self.k8s_applier.get_resource( - name=ingress_item.name, - namespace=ingress_namespace, - kind="Ingress", - api_version="networking.k8s.io/v1", + parent_ref = parent_refs[0] + gateway_name = parent_ref.get("name") + if not gateway_name: + continue + gateway_namespace = parent_ref.get("namespace") or namespace + + matching_gateway = None + for gateway_item in gateway_items: + gateway_item_namespace = gateway_item.namespace or namespace + if ( + gateway_item.name == gateway_name + and gateway_item_namespace == gateway_namespace + ): + matching_gateway = self.k8s_applier.get_resource( + name=gateway_item.name, + namespace=gateway_item_namespace, + kind="Gateway", + api_version="gateway.networking.k8s.io/v1beta1", ) + break - if k8s_ingress: - ingress_dict = normalize_resource_to_dict(k8s_ingress) - ingress_spec = ingress_dict.get("spec", {}) - rules = ingress_spec.get("rules", []) - for rule in rules: - http_config = rule.get("http", {}) - paths = http_config.get("paths", []) - for path_config in paths: - backend = path_config.get("backend", {}) - service_backend = backend.get("service", {}) - backend_service_name = service_backend.get( - "name" - ) - if backend_service_name == service_item.name: - matching_ingress = k8s_ingress - break - if matching_ingress: - break - if matching_ingress: - break - - if matching_ingress and not discovered_urls["ingress"]: - ingress_url = kube_utils.build_service_url( - core_api=self.k8s_core_api, - service=k8s_service, - namespace=service_item.namespace or namespace, - ingress=matching_ingress, + if matching_gateway: + return kube_utils.build_gateway_api_url( + gateway=matching_gateway, + httproute=k8s_httproute, ) - if ingress_url: - discovered_urls["ingress"] = ingress_url - - service_url = kube_utils.build_service_url( - core_api=self.k8s_core_api, - service=k8s_service, - namespace=service_item.namespace or namespace, - ingress=None, + + return None + + def _discover_ingress_url( + self, + service_item: ResourceInventoryItem, + service_namespace: str, + ingress_items: List[ResourceInventoryItem], + namespace: str, + k8s_service: Any, + ) -> Optional[str]: + """Discover Ingress URL for a service.""" + matching_ingress: Optional[Any] = None + for ingress_item in ingress_items: + ingress_namespace = ingress_item.namespace or namespace + if ingress_namespace != service_namespace: + continue + + ingress_obj = self.k8s_applier.get_resource( + name=ingress_item.name, + namespace=ingress_namespace, + kind="Ingress", + api_version="networking.k8s.io/v1", ) - if service_url: - service_dict = normalize_resource_to_dict(k8s_service) - service_type = service_dict.get("spec", {}).get( - "type", "ClusterIP" - ) + if not ingress_obj: + continue - if service_type == "LoadBalancer": - discovered_urls["load_balancer"] = service_url - elif service_type == "NodePort": - discovered_urls["node_port"] = service_url - elif service_type == "ClusterIP": - discovered_urls["cluster_ip"] = service_url + ingress_dict = normalize_resource_to_dict(ingress_obj) + ingress_spec = ingress_dict.get("spec", {}) + rules = ingress_spec.get("rules", []) + for rule in rules: + http_config = rule.get("http", {}) + paths = http_config.get("paths", []) + for path_config in paths: + backend = path_config.get("backend", {}) + service_backend = backend.get("service", {}) + backend_service_name = service_backend.get("name") + if backend_service_name == service_item.name: + matching_ingress = ingress_obj + break + if matching_ingress: + break + if matching_ingress: + break + + if not matching_ingress: + return None - return discovered_urls + return kube_utils.build_service_url( + core_api=self.k8s_core_api, + service=k8s_service, + namespace=service_item.namespace or namespace, + ingress=matching_ingress, + ) + + def _discover_service_urls( + self, + service_item: ResourceInventoryItem, + namespace: str, + k8s_service: Any, + discovered_urls: Dict[str, Optional[str]], + ) -> None: + """Discover direct service URLs (LoadBalancer/NodePort/ClusterIP).""" + service_url = kube_utils.build_service_url( + core_api=self.k8s_core_api, + service=k8s_service, + namespace=service_item.namespace or namespace, + ingress=None, + ) + + if not service_url: + return + + service_dict = normalize_resource_to_dict(k8s_service) + service_type = service_dict.get("spec", {}).get("type", "ClusterIP") + + if service_type == "LoadBalancer": + discovered_urls["load_balancer"] = service_url + elif service_type == "NodePort": + discovered_urls["node_port"] = service_url + elif service_type == "ClusterIP": + discovered_urls["cluster_ip"] = service_url def _select_url( self, @@ -1352,7 +1401,7 @@ def _select_url( deployment_name: str, ) -> Optional[str]: """Pick the URL according to preference. - + Args: discovered_urls: The URLs to choose from. settings: The settings to use. @@ -1360,7 +1409,7 @@ def _select_url( Returns: The selected URL. - + Raises: DeployerError: If the URL preference is not found. """ @@ -1394,9 +1443,13 @@ def _select_url( url = discovered_urls.get(preference.value) if not url: + discovered_available = [ + k for k, v in discovered_urls.items() if v is not None + ] raise DeployerError( f"URL preference '{preference.value}' requested but no matching URL " - f"was discovered for deployment '{deployment_name}'." + f"was discovered for deployment '{deployment_name}'. " + f"Discovered URL types: {discovered_available or 'none'}." ) return url diff --git a/src/zenml/integrations/kubernetes/kube_utils.py b/src/zenml/integrations/kubernetes/kube_utils.py index 5e1452010e1..715e950b7bb 100644 --- a/src/zenml/integrations/kubernetes/kube_utils.py +++ b/src/zenml/integrations/kubernetes/kube_utils.py @@ -1395,18 +1395,20 @@ def build_gateway_api_url( listener_name = parent_ref.get("name") section_name = parent_ref.get("sectionName") - for listener in gateway_listeners: - listener_name_match = listener.get("name") == listener_name - if section_name: - listener_name_match = listener_name_match and ( - listener.get("name") == section_name - ) + if listener_name: + for listener in gateway_listeners: + current_listener_name = listener.get("name") + listener_name_match = current_listener_name == listener_name + if section_name: + listener_name_match = listener_name_match and ( + current_listener_name == section_name + ) - if listener_name_match: - listener_protocol = listener.get("protocol", "").lower() - if listener_protocol in ("https", "tls"): - protocol = "https" - break + if listener_name_match: + listener_protocol = listener.get("protocol", "").lower() + if listener_protocol in ("https", "tls"): + protocol = "https" + break path = "/" if httproute_rules: diff --git a/tests/integration/integrations/kubernetes/conftest.py b/tests/integration/integrations/kubernetes/conftest.py new file mode 100644 index 00000000000..99e54be43f6 --- /dev/null +++ b/tests/integration/integrations/kubernetes/conftest.py @@ -0,0 +1,14 @@ +"""Lightweight fixtures for Kubernetes integration-style unit tests.""" + +from types import SimpleNamespace +from typing import Generator, Tuple + +import pytest + + +pytest_plugins = ["tests.unit.deployers.server.conftest"] + +@pytest.fixture(scope="session", autouse=True) +def auto_environment() -> Generator[Tuple[SimpleNamespace, SimpleNamespace], None, None]: + """Override heavy env fixture to avoid provisioning in these tests.""" + yield SimpleNamespace(), SimpleNamespace() diff --git a/tests/integration/integrations/kubernetes/deployers/test_kubernetes_deployer_urls.py b/tests/integration/integrations/kubernetes/deployers/test_kubernetes_deployer_urls.py new file mode 100644 index 00000000000..ff5dd69c867 --- /dev/null +++ b/tests/integration/integrations/kubernetes/deployers/test_kubernetes_deployer_urls.py @@ -0,0 +1,166 @@ +"""Unit tests for Kubernetes deployer URL selection and discovery.""" + +from typing import Any, Dict, Optional + +import pytest + +from zenml.deployers.exceptions import DeployerError +from zenml.enums import KubernetesServiceType, KubernetesUrlPreference +from zenml.integrations.kubernetes.deployers.kubernetes_deployer import ( + KubernetesDeployer, +) +from zenml.integrations.kubernetes.flavors.kubernetes_deployer_flavor import ( + KubernetesDeployerSettings, +) +from zenml.integrations.kubernetes.k8s_applier import ResourceInventoryItem + + +class _FakeApplier: + """Minimal fake applier to return canned resources.""" + + def __init__(self, resources: Dict[Any, Any]) -> None: + self.resources = resources + + def get_resource( + self, + name: str, + namespace: Optional[str], + kind: str, + api_version: str, + ) -> Optional[Any]: + return self.resources.get((kind, api_version, name, namespace or "")) + + +class _FakeCoreApi: + """Placeholder CoreV1Api stub (not used in these tests).""" + + pass + + +class _TestDeployer(KubernetesDeployer): + """Test subclass to override dependency properties.""" + + def __init__(self, applier: _FakeApplier) -> None: + # Skip base init; we control dependencies manually. + self._applier = applier # type: ignore[attr-defined] + + @property # type: ignore[override] + def k8s_applier(self) -> _FakeApplier: + return self._applier + + @property # type: ignore[override] + def k8s_core_api(self) -> _FakeCoreApi: + return _FakeCoreApi() + + +def _make_deployer_with_resources( + resources: Dict[Any, Any], +) -> KubernetesDeployer: + """Create a deployer instance wired with fake dependencies.""" + return _TestDeployer(applier=_FakeApplier(resources)) + + +def test_select_url_pref_not_found_includes_discovered_types() -> None: + """Explicit preference errors when URL type is missing and lists discovered.""" + deployer = KubernetesDeployer.__new__(KubernetesDeployer) + settings = KubernetesDeployerSettings( + url_preference=KubernetesUrlPreference.INGRESS + ) + discovered = { + "gateway_api": None, + "ingress": None, + "load_balancer": "http://1.2.3.4:8000", + "node_port": None, + "cluster_ip": "http://svc.ns.svc.cluster.local:8000", + } + + with pytest.raises(DeployerError) as excinfo: + deployer._select_url( + discovered_urls=discovered, + settings=settings, + deployment_name="demo", + ) + + message = str(excinfo.value) + assert "ingress" in message.lower() + assert "demo" in message + assert "load_balancer" in message or "cluster_ip" in message + + +def test_select_url_auto_prefers_load_balancer() -> None: + """AUTO preference mirrors service_type ordering.""" + deployer = KubernetesDeployer.__new__(KubernetesDeployer) + settings = KubernetesDeployerSettings( + service_type=KubernetesServiceType.LOAD_BALANCER + ) + discovered = { + "gateway_api": None, + "ingress": None, + "load_balancer": "http://1.2.3.4:8000", + "node_port": "http://node:30000", + "cluster_ip": "http://svc.ns.svc.cluster.local:8000", + } + + url = deployer._select_url( + discovered_urls=discovered, + settings=settings, + deployment_name="demo", + ) + + assert url == "http://1.2.3.4:8000" + + +def test_discover_urls_finds_ingress_url() -> None: + """Ingress discovery returns the ingress URL when configured.""" + service_inventory = ResourceInventoryItem( + kind="Service", + api_version="v1", + namespace="ns", + name="weather", + ) + ingress_inventory = ResourceInventoryItem( + kind="Ingress", + api_version="networking.k8s.io/v1", + namespace="ns", + name="weather-ing", + ) + + resources = { + ("Service", "v1", "weather", "ns"): { + "metadata": {"name": "weather"}, + "spec": { + "type": "ClusterIP", + "ports": [{"port": 8000}], + }, + }, + ("Ingress", "networking.k8s.io/v1", "weather-ing", "ns"): { + "spec": { + "rules": [ + { + "host": "weather.company.com", + "http": { + "paths": [ + { + "path": "/", + "backend": { + "service": {"name": "weather"}, + }, + } + ] + }, + } + ] + } + }, + } + deployer = _make_deployer_with_resources(resources) + + discovered = deployer._discover_urls( + inventory=[service_inventory, ingress_inventory], + namespace="ns", + ) + + assert discovered["ingress"] == "http://weather.company.com/" + assert ( + discovered["cluster_ip"] == "http://weather.ns.svc.cluster.local:8000" + ) diff --git a/tests/integration/integrations/kubernetes/test_kube_utils.py b/tests/integration/integrations/kubernetes/test_kube_utils.py new file mode 100644 index 00000000000..5f7b143c06c --- /dev/null +++ b/tests/integration/integrations/kubernetes/test_kube_utils.py @@ -0,0 +1,50 @@ +"""Tests for Kubernetes utility helpers.""" + +from zenml.integrations.kubernetes import kube_utils + + +def test_build_gateway_api_url_handles_https_listener() -> None: + """Gateway URL builder respects HTTPS listeners and hostnames.""" + gateway = { + "status": { + "listeners": [ + { + "name": "web", + "protocol": "HTTPS", + } + ] + } + } + httproute = { + "spec": { + "hostnames": ["api.example.com"], + "rules": [ + { + "matches": [ + { + "path": {"type": "PathPrefix", "value": "/"}, + } + ] + } + ], + "parentRefs": [{"name": "web"}], + } + } + + url = kube_utils.build_gateway_api_url( + gateway=gateway, + httproute=httproute, + ) + + assert url == "https://api.example.com/" + + +def test_build_gateway_api_url_returns_none_without_hosts() -> None: + """Gracefully returns None when no hostnames are present.""" + gateway = {"status": {"listeners": []}} + httproute = {"spec": {"hostnames": [], "rules": []}} + + assert ( + kube_utils.build_gateway_api_url(gateway=gateway, httproute=httproute) + is None + ) From 218108842986b62bbd5f7fb67d1560527f7623e8 Mon Sep 17 00:00:00 2001 From: Safoine El khabich Date: Tue, 25 Nov 2025 00:05:33 +0100 Subject: [PATCH 3/8] fixes --- src/zenml/cli/utils.py | 33 ++- .../deployers/kubernetes_deployer.py | 135 +++++++----- .../integrations/kubernetes/kube_utils.py | 23 +- .../integrations/kubernetes/conftest.py | 6 +- .../test_kubernetes_deployer_urls.py | 198 ++++++++++++++++++ 5 files changed, 307 insertions(+), 88 deletions(-) diff --git a/src/zenml/cli/utils.py b/src/zenml/cli/utils.py index 695cfd92d26..cbf2977740c 100644 --- a/src/zenml/cli/utils.py +++ b/src/zenml/cli/utils.py @@ -2556,34 +2556,27 @@ def pretty_print_deployment( # cURL example declare("\n[bold]cURL example:[/bold]") - if not deployment.url: - console.print("No endpoint URL available.") - else: - base_url = deployment.url.rstrip("/") - - curl_headers = [] - if auth_key: - if show_secret: - curl_headers.append( - f'-H "Authorization: Bearer {auth_key}"' - ) - else: - curl_headers.append( - '-H "Authorization: Bearer "' - ) + curl_headers = [] + if auth_key: + if show_secret: + curl_headers.append(f'-H "Authorization: Bearer {auth_key}"') + else: + curl_headers.append( + '-H "Authorization: Bearer "' + ) - curl_params = json.dumps(example, indent=2).replace("\n", "\n ") + curl_params = json.dumps(example, indent=2).replace("\n", "\n ") - curl_headers.append('-H "Content-Type: application/json"') - headers_str = "\\\n ".join(curl_headers) + curl_headers.append('-H "Content-Type: application/json"') + headers_str = "\\\n ".join(curl_headers) - curl_command = f"""curl -X POST {base_url}/invoke \\ + curl_command = f"""curl -X POST {endpoint_url}/invoke \\ {headers_str} \\ -d '{{ "parameters": {curl_params} }}'""" - console.print(curl_command) + console.print(curl_command) # JSON Schemas if show_schema: diff --git a/src/zenml/integrations/kubernetes/deployers/kubernetes_deployer.py b/src/zenml/integrations/kubernetes/deployers/kubernetes_deployer.py index 5a55c5ee558..0376f6b7c44 100644 --- a/src/zenml/integrations/kubernetes/deployers/kubernetes_deployer.py +++ b/src/zenml/integrations/kubernetes/deployers/kubernetes_deployer.py @@ -91,6 +91,28 @@ MAX_LOAD_BALANCER_TIMEOUT = 600 # 10 minutes +def _filter_inventory( + inventory: List[ResourceInventoryItem], + kind: str, + api_version: str, +) -> List[ResourceInventoryItem]: + """Filter inventory items by kind and API version. + + Args: + inventory: The resource inventory to filter. + kind: The Kubernetes resource kind to filter by. + api_version: The API version to filter by. + + Returns: + Filtered list of inventory items matching the criteria. + """ + return [ + item + for item in inventory + if item.kind == kind and item.api_version == api_version + ] + + class _DeploymentCtx(BaseModel): """Deployment context.""" @@ -622,11 +644,7 @@ def _wait_for_deployment_readiness( if timeout <= 0: return - deployments = [ - item - for item in inventory - if item.kind == "Deployment" and item.api_version == "apps/v1" - ] + deployments = _filter_inventory(inventory, "Deployment", "apps/v1") for deployment_item in deployments: try: @@ -653,11 +671,7 @@ def _wait_for_deployment_readiness( ) from e if settings.wait_for_load_balancer_timeout > 0: - services = [ - item - for item in inventory - if item.kind == "Service" and item.api_version == "v1" - ] + services = _filter_inventory(inventory, "Service", "v1") lb_timeout = min( timeout, @@ -1051,11 +1065,9 @@ def _get_deployment_state_from_inventory( all_ready = True any_exists = False - deployment_items = [ - item - for item in inventory - if item.kind == "Deployment" and item.api_version == "apps/v1" - ] + deployment_items = _filter_inventory( + inventory, "Deployment", "apps/v1" + ) for deployment_item in deployment_items: k8s_deployment = self.k8s_applier.get_resource( @@ -1161,29 +1173,16 @@ def _discover_urls( "cluster_ip": None, } - service_items = [ - item - for item in inventory - if item.kind == "Service" and item.api_version == "v1" - ] - ingress_items = [ - item - for item in inventory - if item.kind == "Ingress" - and item.api_version == "networking.k8s.io/v1" - ] - gateway_items = [ - item - for item in inventory - if item.kind == "Gateway" - and item.api_version == "gateway.networking.k8s.io/v1beta1" - ] - httproute_items = [ - item - for item in inventory - if item.kind == "HTTPRoute" - and item.api_version == "gateway.networking.k8s.io/v1beta1" - ] + service_items = _filter_inventory(inventory, "Service", "v1") + ingress_items = _filter_inventory( + inventory, "Ingress", "networking.k8s.io/v1" + ) + gateway_items = _filter_inventory( + inventory, "Gateway", "gateway.networking.k8s.io/v1beta1" + ) + httproute_items = _filter_inventory( + inventory, "HTTPRoute", "gateway.networking.k8s.io/v1beta1" + ) for service_item in service_items: k8s_service = self.k8s_applier.get_resource( @@ -1235,7 +1234,21 @@ def _discover_gateway_api_url( httproute_items: List[ResourceInventoryItem], namespace: str, ) -> Optional[str]: - """Discover Gateway API URL for a service.""" + """Discover Gateway API URL for a service. + + Searches HTTPRoutes that reference the given service and finds the + corresponding Gateway to build the URL. + + Args: + service_item: The service inventory item to find a URL for. + service_namespace: The namespace of the service. + gateway_items: List of Gateway resources in the inventory. + httproute_items: List of HTTPRoute resources in the inventory. + namespace: The default namespace for resources without explicit namespace. + + Returns: + The Gateway API URL if found, None otherwise. + """ for httproute_item in httproute_items: httproute_namespace = httproute_item.namespace or namespace if httproute_namespace != service_namespace: @@ -1260,13 +1273,9 @@ def _discover_gateway_api_url( backend_refs = rule.get("backendRefs", []) for backend_ref in backend_refs: backend_service_name = backend_ref.get("name") - backend_namespace = backend_ref.get("namespace") - if backend_namespace: - backend_namespace = ( - backend_namespace or httproute_namespace - ) - else: - backend_namespace = httproute_namespace + backend_namespace = ( + backend_ref.get("namespace") or httproute_namespace + ) if ( backend_service_name == service_item.name @@ -1321,7 +1330,21 @@ def _discover_ingress_url( namespace: str, k8s_service: Any, ) -> Optional[str]: - """Discover Ingress URL for a service.""" + """Discover Ingress URL for a service. + + Searches Ingress resources that route to the given service and builds + the URL from the Ingress host and path configuration. + + Args: + service_item: The service inventory item to find a URL for. + service_namespace: The namespace of the service. + ingress_items: List of Ingress resources in the inventory. + namespace: The default namespace for resources without explicit namespace. + k8s_service: The Kubernetes Service resource object. + + Returns: + The Ingress URL if found, None otherwise. + """ matching_ingress: Optional[Any] = None for ingress_item in ingress_items: ingress_namespace = ingress_item.namespace or namespace @@ -1373,7 +1396,17 @@ def _discover_service_urls( k8s_service: Any, discovered_urls: Dict[str, Optional[str]], ) -> None: - """Discover direct service URLs (LoadBalancer/NodePort/ClusterIP).""" + """Discover direct service URLs based on service type. + + Populates the discovered_urls dict with LoadBalancer, NodePort, or + ClusterIP URLs based on the service type. + + Args: + service_item: The service inventory item. + namespace: The default namespace. + k8s_service: The Kubernetes Service resource object. + discovered_urls: Dict to populate with discovered URLs (mutated in place). + """ service_url = kube_utils.build_service_url( core_api=self.k8s_core_api, service=k8s_service, @@ -1387,11 +1420,11 @@ def _discover_service_urls( service_dict = normalize_resource_to_dict(k8s_service) service_type = service_dict.get("spec", {}).get("type", "ClusterIP") - if service_type == "LoadBalancer": + if service_type == KubernetesServiceType.LOAD_BALANCER.value: discovered_urls["load_balancer"] = service_url - elif service_type == "NodePort": + elif service_type == KubernetesServiceType.NODE_PORT.value: discovered_urls["node_port"] = service_url - elif service_type == "ClusterIP": + elif service_type == KubernetesServiceType.CLUSTER_IP.value: discovered_urls["cluster_ip"] = service_url def _select_url( diff --git a/src/zenml/integrations/kubernetes/kube_utils.py b/src/zenml/integrations/kubernetes/kube_utils.py index 715e950b7bb..080d6e4b03c 100644 --- a/src/zenml/integrations/kubernetes/kube_utils.py +++ b/src/zenml/integrations/kubernetes/kube_utils.py @@ -1392,23 +1392,18 @@ def build_gateway_api_url( parent_refs = httproute_spec.get("parentRefs", []) if parent_refs: parent_ref = parent_refs[0] - listener_name = parent_ref.get("name") section_name = parent_ref.get("sectionName") - if listener_name: - for listener in gateway_listeners: - current_listener_name = listener.get("name") - listener_name_match = current_listener_name == listener_name - if section_name: - listener_name_match = listener_name_match and ( - current_listener_name == section_name - ) + for listener in gateway_listeners: + current_listener_name = listener.get("name") + + if section_name and current_listener_name != section_name: + continue - if listener_name_match: - listener_protocol = listener.get("protocol", "").lower() - if listener_protocol in ("https", "tls"): - protocol = "https" - break + listener_protocol = listener.get("protocol", "").lower() + if listener_protocol in ("https", "tls"): + protocol = "https" + break path = "/" if httproute_rules: diff --git a/tests/integration/integrations/kubernetes/conftest.py b/tests/integration/integrations/kubernetes/conftest.py index 99e54be43f6..aa9fa84b5c5 100644 --- a/tests/integration/integrations/kubernetes/conftest.py +++ b/tests/integration/integrations/kubernetes/conftest.py @@ -1,14 +1,14 @@ """Lightweight fixtures for Kubernetes integration-style unit tests.""" from types import SimpleNamespace -from typing import Generator, Tuple +from typing import Iterator, Tuple import pytest - pytest_plugins = ["tests.unit.deployers.server.conftest"] + @pytest.fixture(scope="session", autouse=True) -def auto_environment() -> Generator[Tuple[SimpleNamespace, SimpleNamespace], None, None]: +def auto_environment() -> Iterator[Tuple[SimpleNamespace, SimpleNamespace]]: """Override heavy env fixture to avoid provisioning in these tests.""" yield SimpleNamespace(), SimpleNamespace() diff --git a/tests/integration/integrations/kubernetes/deployers/test_kubernetes_deployer_urls.py b/tests/integration/integrations/kubernetes/deployers/test_kubernetes_deployer_urls.py index ff5dd69c867..d0df82bf643 100644 --- a/tests/integration/integrations/kubernetes/deployers/test_kubernetes_deployer_urls.py +++ b/tests/integration/integrations/kubernetes/deployers/test_kubernetes_deployer_urls.py @@ -164,3 +164,201 @@ def test_discover_urls_finds_ingress_url() -> None: assert ( discovered["cluster_ip"] == "http://weather.ns.svc.cluster.local:8000" ) + + +def test_select_url_auto_prefers_node_port_when_configured() -> None: + """AUTO preference with NodePort service type prefers NodePort URLs.""" + deployer = KubernetesDeployer.__new__(KubernetesDeployer) + settings = KubernetesDeployerSettings( + service_type=KubernetesServiceType.NODE_PORT + ) + discovered = { + "gateway_api": None, + "ingress": None, + "load_balancer": "http://1.2.3.4:8000", + "node_port": "http://node:30000", + "cluster_ip": "http://svc.ns.svc.cluster.local:8000", + } + + url = deployer._select_url( + discovered_urls=discovered, + settings=settings, + deployment_name="demo", + ) + + assert url == "http://node:30000" + + +def test_select_url_auto_prefers_cluster_ip_when_configured() -> None: + """AUTO preference with ClusterIP service type prefers ClusterIP URLs.""" + deployer = KubernetesDeployer.__new__(KubernetesDeployer) + settings = KubernetesDeployerSettings( + service_type=KubernetesServiceType.CLUSTER_IP + ) + discovered = { + "gateway_api": None, + "ingress": None, + "load_balancer": "http://1.2.3.4:8000", + "node_port": "http://node:30000", + "cluster_ip": "http://svc.ns.svc.cluster.local:8000", + } + + url = deployer._select_url( + discovered_urls=discovered, + settings=settings, + deployment_name="demo", + ) + + assert url == "http://svc.ns.svc.cluster.local:8000" + + +def test_select_url_auto_falls_back_when_preferred_unavailable() -> None: + """AUTO preference falls back to next priority when preferred is missing.""" + deployer = KubernetesDeployer.__new__(KubernetesDeployer) + settings = KubernetesDeployerSettings( + service_type=KubernetesServiceType.LOAD_BALANCER + ) + discovered = { + "gateway_api": None, + "ingress": None, + "load_balancer": None, # Not available + "node_port": "http://node:30000", + "cluster_ip": "http://svc.ns.svc.cluster.local:8000", + } + + url = deployer._select_url( + discovered_urls=discovered, + settings=settings, + deployment_name="demo", + ) + + # Falls back to NodePort + assert url == "http://node:30000" + + +def test_select_url_explicit_preference_returns_matching_url() -> None: + """Explicit preference returns the requested URL type.""" + deployer = KubernetesDeployer.__new__(KubernetesDeployer) + settings = KubernetesDeployerSettings( + url_preference=KubernetesUrlPreference.CLUSTER_IP + ) + discovered = { + "gateway_api": None, + "ingress": None, + "load_balancer": "http://1.2.3.4:8000", + "node_port": "http://node:30000", + "cluster_ip": "http://svc.ns.svc.cluster.local:8000", + } + + url = deployer._select_url( + discovered_urls=discovered, + settings=settings, + deployment_name="demo", + ) + + assert url == "http://svc.ns.svc.cluster.local:8000" + + +def test_discover_urls_finds_gateway_api_url() -> None: + """Gateway API discovery returns URL when HTTPRoute and Gateway are configured.""" + service_inventory = ResourceInventoryItem( + kind="Service", + api_version="v1", + namespace="ns", + name="api-service", + ) + gateway_inventory = ResourceInventoryItem( + kind="Gateway", + api_version="gateway.networking.k8s.io/v1beta1", + namespace="ns", + name="main-gateway", + ) + httproute_inventory = ResourceInventoryItem( + kind="HTTPRoute", + api_version="gateway.networking.k8s.io/v1beta1", + namespace="ns", + name="api-route", + ) + + resources = { + ("Service", "v1", "api-service", "ns"): { + "metadata": {"name": "api-service"}, + "spec": { + "type": "ClusterIP", + "ports": [{"port": 8000}], + }, + }, + ( + "Gateway", + "gateway.networking.k8s.io/v1beta1", + "main-gateway", + "ns", + ): { + "status": { + "listeners": [ + { + "name": "http", + "protocol": "HTTP", + } + ] + } + }, + ( + "HTTPRoute", + "gateway.networking.k8s.io/v1beta1", + "api-route", + "ns", + ): { + "spec": { + "hostnames": ["api.example.com"], + "parentRefs": [{"name": "main-gateway", "namespace": "ns"}], + "rules": [ + { + "matches": [ + {"path": {"type": "PathPrefix", "value": "/"}} + ], + "backendRefs": [ + {"name": "api-service", "namespace": "ns"} + ], + } + ], + } + }, + } + deployer = _make_deployer_with_resources(resources) + + discovered = deployer._discover_urls( + inventory=[service_inventory, gateway_inventory, httproute_inventory], + namespace="ns", + ) + + assert discovered["gateway_api"] == "http://api.example.com/" + assert ( + discovered["cluster_ip"] + == "http://api-service.ns.svc.cluster.local:8000" + ) + + +def test_discover_urls_returns_none_when_applier_returns_none() -> None: + """Discovery returns empty URLs when applier cannot find resources.""" + service_inventory = ResourceInventoryItem( + kind="Service", + api_version="v1", + namespace="ns", + name="missing-service", + ) + + # Empty resources - applier will return None for everything + resources: Dict[Any, Any] = {} + deployer = _make_deployer_with_resources(resources) + + discovered = deployer._discover_urls( + inventory=[service_inventory], + namespace="ns", + ) + + assert discovered["gateway_api"] is None + assert discovered["ingress"] is None + assert discovered["load_balancer"] is None + assert discovered["node_port"] is None + assert discovered["cluster_ip"] is None From b35c79ba41451d0be1743b913cd08a91eee0297c Mon Sep 17 00:00:00 2001 From: GitHub Actions Date: Wed, 3 Dec 2025 10:33:28 +0000 Subject: [PATCH 4/8] Auto-update of Starter template --- .github/workflows/require-release-label.yml | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/.github/workflows/require-release-label.yml b/.github/workflows/require-release-label.yml index 66be8d3bd5d..9c6762789a9 100644 --- a/.github/workflows/require-release-label.yml +++ b/.github/workflows/require-release-label.yml @@ -1,12 +1,11 @@ +--- # Requires PRs to have either 'release-notes' or 'no-release-notes' label # This ensures release notes are considered for every PR before merging. # The check is enforced via branch protection rules on develop. name: Require Release Label - on: pull_request: types: [opened, labeled, unlabeled, synchronize] - jobs: check-label: if: github.repository == 'zenml-io/zenml' @@ -17,8 +16,8 @@ jobs: with: mode: exactly count: 1 - labels: "release-notes, no-release-notes" - message: | + labels: release-notes, no-release-notes + message: |- This PR is missing a release label. Please add one of: - `release-notes` - if this PR has user-facing changes that should appear in the changelog - `no-release-notes` - if this is an internal change (refactoring, tests, CI, etc.) From 6774a9f4accb077382533a390e7c4f37c2ae5882 Mon Sep 17 00:00:00 2001 From: Safoine El khabich Date: Fri, 5 Dec 2025 12:52:12 +0100 Subject: [PATCH 5/8] fix based on claude review --- src/zenml/integrations/kubernetes/kube_utils.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/zenml/integrations/kubernetes/kube_utils.py b/src/zenml/integrations/kubernetes/kube_utils.py index 080d6e4b03c..f9e81005fba 100644 --- a/src/zenml/integrations/kubernetes/kube_utils.py +++ b/src/zenml/integrations/kubernetes/kube_utils.py @@ -1396,6 +1396,8 @@ def build_gateway_api_url( for listener in gateway_listeners: current_listener_name = listener.get("name") + if not current_listener_name: + continue if section_name and current_listener_name != section_name: continue From ede0ef3d40466c00b78673292e3b9f6802ae042b Mon Sep 17 00:00:00 2001 From: Safoine El khabich Date: Thu, 11 Dec 2025 09:59:13 +0100 Subject: [PATCH 6/8] url review fixes --- .../integrations/kubernetes/constants.py | 6 + .../deployers/kubernetes_deployer.py | 41 +++-- .../kubernetes/gateway_api_models.py | 162 ++++++++++++++++++ .../integrations/kubernetes/kube_utils.py | 59 ++++--- .../kubernetes/serialization_utils.py | 48 +++++- .../test_kubernetes_deployer_urls.py | 8 +- 6 files changed, 273 insertions(+), 51 deletions(-) create mode 100644 src/zenml/integrations/kubernetes/gateway_api_models.py diff --git a/src/zenml/integrations/kubernetes/constants.py b/src/zenml/integrations/kubernetes/constants.py index 80b075822b2..a01f0af52aa 100644 --- a/src/zenml/integrations/kubernetes/constants.py +++ b/src/zenml/integrations/kubernetes/constants.py @@ -25,3 +25,9 @@ ORCHESTRATOR_RUN_ID_ANNOTATION_KEY = "zenml.io/orchestrator-run-id" STEP_NAME_ANNOTATION_KEY = "zenml.io/step-name" STEP_OPERATOR_ANNOTATION_KEY = "zenml.io/step-operator" + +# Gateway API versions +GATEWAY_API_VERSIONS = ( + "gateway.networking.k8s.io/v1", + "gateway.networking.k8s.io/v1beta1", +) diff --git a/src/zenml/integrations/kubernetes/deployers/kubernetes_deployer.py b/src/zenml/integrations/kubernetes/deployers/kubernetes_deployer.py index 38b0f3ff44c..841822489bb 100644 --- a/src/zenml/integrations/kubernetes/deployers/kubernetes_deployer.py +++ b/src/zenml/integrations/kubernetes/deployers/kubernetes_deployer.py @@ -57,6 +57,7 @@ KubernetesDeployerConfig, KubernetesDeployerSettings, ) +from zenml.integrations.kubernetes.gateway_api_models import HTTPRouteSpec from zenml.integrations.kubernetes.k8s_applier import ( KubernetesApplier, ProvisioningError, @@ -71,6 +72,7 @@ ) from zenml.integrations.kubernetes.serialization_utils import ( normalize_resource_to_dict, + parse_gateway_api_resource, ) from zenml.integrations.kubernetes.template_engine import ( KubernetesTemplateEngine, @@ -1255,27 +1257,33 @@ def _discover_gateway_api_url( name=httproute_item.name, namespace=httproute_namespace, kind="HTTPRoute", - api_version="gateway.networking.k8s.io/v1beta1", + api_version=httproute_item.api_version, ) if not k8s_httproute: continue - httproute_dict = normalize_resource_to_dict(k8s_httproute) - httproute_spec = httproute_dict.get("spec", {}) - httproute_rules = httproute_spec.get("rules", []) + try: + httproute_dict = normalize_resource_to_dict(k8s_httproute) + httproute_spec_dict = httproute_dict.get("spec", {}) + httproute_spec = parse_gateway_api_resource( + httproute_spec_dict, HTTPRouteSpec + ) + except ValueError as e: + logger.warning( + f"Failed to parse HTTPRoute {httproute_item.name}: {e}" + ) + continue routes_to_service = False - for rule in httproute_rules: - backend_refs = rule.get("backendRefs", []) - for backend_ref in backend_refs: - backend_service_name = backend_ref.get("name") + for rule in httproute_spec.rules: + for backend_ref in rule.backend_refs: backend_namespace = ( - backend_ref.get("namespace") or httproute_namespace + backend_ref.namespace or httproute_namespace ) if ( - backend_service_name == service_item.name + backend_ref.name == service_item.name and backend_namespace == service_namespace ): routes_to_service = True @@ -1286,15 +1294,12 @@ def _discover_gateway_api_url( if not routes_to_service: continue - parent_refs = httproute_spec.get("parentRefs", []) - if not parent_refs: + if not httproute_spec.parent_refs: continue - parent_ref = parent_refs[0] - gateway_name = parent_ref.get("name") - if not gateway_name: - continue - gateway_namespace = parent_ref.get("namespace") or namespace + parent_ref = httproute_spec.parent_refs[0] + gateway_name = parent_ref.name + gateway_namespace = parent_ref.namespace or namespace matching_gateway = None for gateway_item in gateway_items: @@ -1307,7 +1312,7 @@ def _discover_gateway_api_url( name=gateway_item.name, namespace=gateway_item_namespace, kind="Gateway", - api_version="gateway.networking.k8s.io/v1beta1", + api_version=gateway_item.api_version, ) break diff --git a/src/zenml/integrations/kubernetes/gateway_api_models.py b/src/zenml/integrations/kubernetes/gateway_api_models.py new file mode 100644 index 00000000000..e7b6a713176 --- /dev/null +++ b/src/zenml/integrations/kubernetes/gateway_api_models.py @@ -0,0 +1,162 @@ +# Copyright (c) ZenML GmbH 2025. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Pydantic models for Kubernetes Gateway API resources. + +This module provides type-safe models for Gateway API resources (Gateway and +HTTPRoute) to replace untyped Dict[str, Any] access patterns with validated +attribute access. + +Gateway API is the newer Kubernetes standard for ingress/routing, using: +- Gateway: Entry point with listeners (replaces LoadBalancer/NodePort exposure) +- HTTPRoute: Routing rules that attach to Gateways (replaces Ingress) + +Note: + The Kubernetes Python client library does not include typed models for + Gateway API resources because Gateway API is a CRD-based extension, not + part of core Kubernetes. These models provide the type safety needed for + the fields we use in URL building. +""" + +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, ConfigDict, Field + + +class HTTPPathMatch(BaseModel): + """Path match configuration for HTTPRoute. + + Attributes: + value: Path value to match (e.g., "/api/v1"). + type: Match type - "Exact", "PathPrefix", or "RegularExpression". + """ + + value: str = "/" + type: Optional[str] = None + + model_config = ConfigDict(populate_by_name=True) + + +class HTTPRouteMatch(BaseModel): + """Match criteria for HTTPRoute rule. + + Attributes: + path: Optional path match configuration. + """ + + path: Optional[HTTPPathMatch] = None + + model_config = ConfigDict(populate_by_name=True) + + +class BackendRef(BaseModel): + """Backend reference in HTTPRoute rule. + + Attributes: + name: Name of the backend service. + namespace: Namespace of the backend service. + port: Port number on the backend service. + weight: Traffic weight for load balancing. + """ + + name: str + namespace: Optional[str] = None + port: Optional[int] = None + weight: Optional[int] = None + + model_config = ConfigDict(populate_by_name=True) + + +class HTTPRouteRule(BaseModel): + """Routing rule in HTTPRoute. + + Attributes: + matches: List of match conditions for this rule. + backend_refs: List of backend services to route traffic to. + """ + + matches: List[HTTPRouteMatch] = Field(default_factory=list) + backend_refs: List[BackendRef] = Field( + default_factory=list, alias="backendRefs" + ) + + model_config = ConfigDict(populate_by_name=True) + + +class ParentReference(BaseModel): + """Reference to parent Gateway. + + Attributes: + name: Name of the parent Gateway. + namespace: Namespace of the parent Gateway. + section_name: Name of specific Gateway listener to attach to. + """ + + name: str + namespace: Optional[str] = None + section_name: Optional[str] = Field(None, alias="sectionName") + + model_config = ConfigDict(populate_by_name=True) + + +class HTTPRouteSpec(BaseModel): + """HTTPRoute specification. + + Attributes: + hostnames: List of hostnames this route matches. + parent_refs: List of parent Gateways this route attaches to. + rules: List of routing rules with match conditions and backends. + """ + + hostnames: List[str] = Field(default_factory=list) + parent_refs: List[ParentReference] = Field( + default_factory=list, alias="parentRefs" + ) + rules: List[HTTPRouteRule] = Field(default_factory=list) + + model_config = ConfigDict(populate_by_name=True) + + +# Gateway Models + + +class GatewayListener(BaseModel): + """Gateway listener configuration. + + Attributes: + name: Unique name for this listener. + protocol: Protocol - "HTTP", "HTTPS", "TLS", "TCP", or "UDP". + port: Port number the listener binds to. + hostname: Hostname the listener accepts. + """ + + name: str + protocol: str + port: Optional[int] = None + hostname: Optional[str] = None + + model_config = ConfigDict(populate_by_name=True) + + +class GatewayStatus(BaseModel): + """Gateway status information. + + Attributes: + listeners: List of listener configurations and their status. + addresses: List of addresses where the Gateway is accessible. + """ + + listeners: List[GatewayListener] = Field(default_factory=list) + addresses: List[Dict[str, Any]] = Field(default_factory=list) + + model_config = ConfigDict(populate_by_name=True) diff --git a/src/zenml/integrations/kubernetes/kube_utils.py b/src/zenml/integrations/kubernetes/kube_utils.py index f9e81005fba..1147458503c 100644 --- a/src/zenml/integrations/kubernetes/kube_utils.py +++ b/src/zenml/integrations/kubernetes/kube_utils.py @@ -58,6 +58,10 @@ from zenml.integrations.kubernetes.constants import ( STEP_NAME_ANNOTATION_KEY, ) +from zenml.integrations.kubernetes.gateway_api_models import ( + GatewayStatus, + HTTPRouteSpec, +) from zenml.integrations.kubernetes.manifest_utils import ( build_namespace_manifest, build_role_binding_manifest_for_service_account, @@ -67,6 +71,7 @@ from zenml.integrations.kubernetes.pod_settings import KubernetesPodSettings from zenml.integrations.kubernetes.serialization_utils import ( normalize_resource_to_dict, + parse_gateway_api_resource, ) from zenml.logger import get_logger from zenml.utils.time_utils import utc_now @@ -1376,45 +1381,45 @@ def build_gateway_api_url( gateway_dict = normalize_resource_to_dict(gateway) httproute_dict = normalize_resource_to_dict(httproute) - gateway_status = gateway_dict.get("status", {}) - gateway_listeners = gateway_status.get("listeners", []) + try: + gateway_status_dict = gateway_dict.get("status", {}) + gateway_status = parse_gateway_api_resource( + {"listeners": gateway_status_dict.get("listeners", [])}, + GatewayStatus, + ) - httproute_spec = httproute_dict.get("spec", {}) - httproute_hostnames = httproute_spec.get("hostnames", []) - httproute_rules = httproute_spec.get("rules", []) + httproute_spec_dict = httproute_dict.get("spec", {}) + httproute_spec = parse_gateway_api_resource( + httproute_spec_dict, HTTPRouteSpec + ) + except ValueError as e: + logger.warning(f"Failed to parse Gateway API resources: {e}") + return None - if not httproute_hostnames or not httproute_rules: + if not httproute_spec.hostnames or not httproute_spec.rules: return None - hostname = httproute_hostnames[0] + hostname = httproute_spec.hostnames[0] protocol = "http" - parent_refs = httproute_spec.get("parentRefs", []) - if parent_refs: - parent_ref = parent_refs[0] - section_name = parent_ref.get("sectionName") - - for listener in gateway_listeners: - current_listener_name = listener.get("name") - if not current_listener_name: - continue + if httproute_spec.parent_refs: + parent_ref = httproute_spec.parent_refs[0] + section_name = parent_ref.section_name - if section_name and current_listener_name != section_name: + for listener in gateway_status.listeners: + if section_name and listener.name != section_name: continue - listener_protocol = listener.get("protocol", "").lower() - if listener_protocol in ("https", "tls"): + if listener.protocol.lower() in ("https", "tls"): protocol = "https" break path = "/" - if httproute_rules: - first_rule = httproute_rules[0] - matches = first_rule.get("matches", []) - if matches: - first_match = matches[0] - match_path = first_match.get("path") - if match_path: - path = match_path.get("value", "/") + if httproute_spec.rules: + first_rule = httproute_spec.rules[0] + if first_rule.matches: + first_match = first_rule.matches[0] + if first_match.path: + path = first_match.path.value return f"{protocol}://{hostname}{path}" diff --git a/src/zenml/integrations/kubernetes/serialization_utils.py b/src/zenml/integrations/kubernetes/serialization_utils.py index e152394e75d..718ae2d0648 100644 --- a/src/zenml/integrations/kubernetes/serialization_utils.py +++ b/src/zenml/integrations/kubernetes/serialization_utils.py @@ -15,10 +15,12 @@ import re from datetime import date, datetime -from typing import TYPE_CHECKING, Any, Dict, List, Type, Union, cast +from typing import TYPE_CHECKING, Any, Dict, List, Type, TypeVar, Union, cast if TYPE_CHECKING: - pass + from pydantic import BaseModel + +T = TypeVar("T", bound="BaseModel") def normalize_resource_to_dict( @@ -268,3 +270,45 @@ def _deserialize_dict(data: Any, class_name: str) -> Dict[str, Any]: } else: return data + + +def parse_gateway_api_resource( + resource: Union[Dict[str, Any], Any], + model_class: Type[T], +) -> T: + """Parse Gateway API resource into typed Pydantic model. + + Converts raw Kubernetes Gateway API resources (Gateway, HTTPRoute) into + validated Pydantic models for type-safe attribute access. + + Args: + resource: Raw K8s resource (dict, typed model, or dynamic resource). + model_class: Target Pydantic model class (e.g., GatewayStatus or + HTTPRouteSpec from gateway_api_models). + + Returns: + Validated Pydantic model instance with type safety. + + Raises: + ValueError: If resource cannot be normalized or validation fails. + """ + from pydantic import ValidationError + + try: + resource_dict = normalize_resource_to_dict(resource) + except ValueError as e: + raise ValueError( + f"Failed to normalize {model_class.__name__} resource to dict: {e}" + ) from e + + try: + return model_class.model_validate(resource_dict) + except ValidationError as e: + raise ValueError( + f"Failed to parse {model_class.__name__} from resource dict. " + f"Validation errors: {e}" + ) from e + except Exception as e: + raise ValueError( + f"Unexpected error parsing {model_class.__name__}: {e}" + ) from e diff --git a/tests/integration/integrations/kubernetes/deployers/test_kubernetes_deployer_urls.py b/tests/integration/integrations/kubernetes/deployers/test_kubernetes_deployer_urls.py index d0df82bf643..e5143ae2d74 100644 --- a/tests/integration/integrations/kubernetes/deployers/test_kubernetes_deployer_urls.py +++ b/tests/integration/integrations/kubernetes/deployers/test_kubernetes_deployer_urls.py @@ -269,13 +269,13 @@ def test_discover_urls_finds_gateway_api_url() -> None: ) gateway_inventory = ResourceInventoryItem( kind="Gateway", - api_version="gateway.networking.k8s.io/v1beta1", + api_version="gateway.networking.k8s.io/v1", namespace="ns", name="main-gateway", ) httproute_inventory = ResourceInventoryItem( kind="HTTPRoute", - api_version="gateway.networking.k8s.io/v1beta1", + api_version="gateway.networking.k8s.io/v1", namespace="ns", name="api-route", ) @@ -290,7 +290,7 @@ def test_discover_urls_finds_gateway_api_url() -> None: }, ( "Gateway", - "gateway.networking.k8s.io/v1beta1", + "gateway.networking.k8s.io/v1", "main-gateway", "ns", ): { @@ -305,7 +305,7 @@ def test_discover_urls_finds_gateway_api_url() -> None: }, ( "HTTPRoute", - "gateway.networking.k8s.io/v1beta1", + "gateway.networking.k8s.io/v1", "api-route", "ns", ): { From 32ef6c57c33f865ca539a00179ff861be107388d Mon Sep 17 00:00:00 2001 From: Safoine El khabich Date: Thu, 11 Dec 2025 10:10:35 +0100 Subject: [PATCH 7/8] fix utils --- src/zenml/cli/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/zenml/cli/utils.py b/src/zenml/cli/utils.py index 43300dc4b0c..330567b1de5 100644 --- a/src/zenml/cli/utils.py +++ b/src/zenml/cli/utils.py @@ -2421,7 +2421,7 @@ def pretty_print_deployment( curl_headers.append('-H "Content-Type: application/json"') headers_str = "\\\n ".join(curl_headers) - curl_command = f"""curl -X POST {endpoint_url}/invoke \\ + curl_command = f"""curl -X POST {deployment.url}/invoke \\ {headers_str} \\ -d '{{ "parameters": {curl_params} From 2001b4ae96be953f81e65a567b8868c9be9e8496 Mon Sep 17 00:00:00 2001 From: Safoine El khabich Date: Thu, 11 Dec 2025 11:34:10 +0100 Subject: [PATCH 8/8] mypy issues --- .../deployers/kubernetes_deployer.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/src/zenml/integrations/kubernetes/deployers/kubernetes_deployer.py b/src/zenml/integrations/kubernetes/deployers/kubernetes_deployer.py index 841822489bb..29bd3de4cf7 100644 --- a/src/zenml/integrations/kubernetes/deployers/kubernetes_deployer.py +++ b/src/zenml/integrations/kubernetes/deployers/kubernetes_deployer.py @@ -53,6 +53,7 @@ StackComponentType, ) from zenml.integrations.kubernetes import kube_utils +from zenml.integrations.kubernetes.constants import GATEWAY_API_VERSIONS from zenml.integrations.kubernetes.flavors.kubernetes_deployer_flavor import ( KubernetesDeployerConfig, KubernetesDeployerSettings, @@ -1176,12 +1177,18 @@ def _discover_urls( ingress_items = _filter_inventory( inventory, "Ingress", "networking.k8s.io/v1" ) - gateway_items = _filter_inventory( - inventory, "Gateway", "gateway.networking.k8s.io/v1beta1" - ) - httproute_items = _filter_inventory( - inventory, "HTTPRoute", "gateway.networking.k8s.io/v1beta1" - ) + gateway_items = [ + item + for item in inventory + if item.kind == "Gateway" + and item.api_version in GATEWAY_API_VERSIONS + ] + httproute_items = [ + item + for item in inventory + if item.kind == "HTTPRoute" + and item.api_version in GATEWAY_API_VERSIONS + ] for service_item in service_items: k8s_service = self.k8s_applier.get_resource(