diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 5a016fd6a..817ba5fc9 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -204,6 +204,10 @@ path = "pod_portforward_bind.rs" name = "pod_resize" path = "pod_resize.rs" +[[example]] +name = "pod_resize_wait" +path = "pod_resize_wait.rs" + [[example]] name = "pod_reflector" path = "pod_reflector.rs" diff --git a/examples/pod_resize_wait.rs b/examples/pod_resize_wait.rs new file mode 100644 index 000000000..5c41ba033 --- /dev/null +++ b/examples/pod_resize_wait.rs @@ -0,0 +1,155 @@ +use k8s_openapi::api::core::v1::Pod; +use kube::{ + Client, ResourceExt, + api::{Api, DeleteParams, Patch, PatchParams, PostParams}, + runtime::wait::{await_condition, conditions}, +}; +use tracing::*; + +fn inspect_pod_resize(pod: &Pod) { + if let Some(spec) = &pod.spec { + if let Some(container) = spec.containers.first() { + info!("Spec resources (desired): {:?}", container.resources); + } + } + if let Some(status) = &pod.status { + info!("Resize status: {:?}", status.resize); + if let Some(container_status) = status.container_statuses.as_ref().and_then(|cs| cs.first()) { + info!("Status resources (actual): {:?}", container_status.resources); + info!("Container restart count: {}", container_status.restart_count); + } + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + let client = Client::try_default().await?; + + let pods: Api = Api::default_namespaced(client); + + // Create a sample pod with resource limits and resize policy + info!("Creating pod with initial resource requirements"); + let pod_template = serde_json::json!({ + "apiVersion": "v1", + "kind": "Pod", + "metadata": { "name": "resize-wait-demo" }, + "spec": { + "containers": [{ + "name": "app", + "image": "alpine:3.23", + "resizePolicy": [ + { + "resourceName": "cpu", + "restartPolicy": "NotRequired" + }, + { + "resourceName": "memory", + "restartPolicy": "RestartContainer" + } + ], + "resources": { + "requests": { + "cpu": "100m", + "memory": "128Mi" + }, + "limits": { + "cpu": "200m", + "memory": "256Mi" + } + } + }] + } + }); + let pod = serde_json::from_value(pod_template)?; + let pp = PostParams::default(); + match pods.create(&pp, &pod).await { + Ok(created) => info!("Created pod: {}", created.name_any()), + Err(kube::Error::Api(ae)) if ae.code == 409 => { + info!("Pod already exists, patching it..."); + let pp = PatchParams::apply("pod-resize-example"); + let patch = Patch::Apply(pod); + let _ = pods.patch("resize-wait-demo", &pp, &patch).await?; + } + Err(e) => return Err(e.into()), + } + // Wait for pod to be running + info!("Waiting for pod to be running..."); + let running = await_condition(pods.clone(), "resize-wait-demo", conditions::is_pod_running()); + tokio::time::timeout(std::time::Duration::from_secs(60), running).await??; + info!("✓ Pod is running"); + + // Display initial resources + let current = pods.get("resize-wait-demo").await?; + info!("Initial pod state:"); + inspect_pod_resize(¤t); + + // Resize CPU (no restart required) + info!("\n--- Example 1: Resizing CPU (NotRequired restart policy) ---"); + let cpu_patch = serde_json::json!({ + "spec": { + "containers": [{ + "name": "app", + "resources": { + "requests": { + "cpu": "150m" + }, + "limits": { + "cpu": "300m" + } + } + }] + } + }); + + let patch_params = PatchParams::default(); + info!("Patching pod with new CPU resources..."); + pods.patch_resize("resize-wait-demo", &patch_params, &Patch::Strategic(cpu_patch)) + .await?; + + info!("Waiting for resize to complete..."); + let resized = await_condition(pods.clone(), "resize-wait-demo", conditions::is_pod_resized()); + let pod = tokio::time::timeout(std::time::Duration::from_secs(30), resized).await??; + info!("✓ Pod CPU resize completed successfully!"); + if let Some(pod) = pod { + inspect_pod_resize(&pod); + } + + // Resize Memory (restart required) + info!("\n--- Example 2: Resizing Memory (RestartContainer policy) ---"); + let mem_patch = serde_json::json!({ + "spec": { + "containers": [{ + "name": "app", + "resources": { + "requests": { + "memory": "192Mi" + }, + "limits": { + "memory": "384Mi" + } + } + }] + } + }); + + info!("Patching pod with new memory resources..."); + pods.patch_resize("resize-wait-demo", &patch_params, &Patch::Strategic(mem_patch)) + .await?; + + info!("Waiting for memory resize to complete (container will restart)..."); + let resized = await_condition(pods.clone(), "resize-wait-demo", conditions::is_pod_resized()); + let pod = tokio::time::timeout(std::time::Duration::from_secs(60), resized).await??; + info!("✓ Pod memory resize completed successfully!"); + if let Some(pod) = pod { + inspect_pod_resize(&pod); + } + + // Cleanup + info!("\nCleaning up..."); + let dp = DeleteParams::default(); + pods.delete("resize-wait-demo", &dp).await?; + info!("Pod deleted"); + + Ok(()) +} diff --git a/kube-runtime/src/wait.rs b/kube-runtime/src/wait.rs index 4c461c5fc..53f0fa774 100644 --- a/kube-runtime/src/wait.rs +++ b/kube-runtime/src/wait.rs @@ -222,6 +222,46 @@ pub mod conditions { } } + /// An await condition for `Pod` that returns `true` once an in-place resize operation has completed + /// + /// A resize is considered complete when, for every container in `spec.containers`, + /// the corresponding entry in `status.containerStatuses` reports the same `resources` + /// (both `requests` and `limits`). + /// + /// This avoids a race condition where checking only for the absence of + /// `PodResizePending`/`PodResizeInProgress` conditions could return `true` before the + /// kubelet has even started processing the resize. + /// + /// See: + #[must_use] + pub fn is_pod_resized() -> impl Condition { + |obj: Option<&Pod>| { + if let Some(pod) = obj + && let Some(status) = &pod.status + && let Some(spec) = &pod.spec + && let Some(container_statuses) = &status.container_statuses + { + return spec.containers.iter().all(|container| { + container_statuses + .iter() + .find(|cs| cs.name == container.name) + .is_some_and(|cs| { + let spec_resources = container.resources.as_ref(); + let status_resources = cs.resources.as_ref(); + + let requests_match = spec_resources.and_then(|r| r.requests.as_ref()) + == status_resources.and_then(|r| r.requests.as_ref()); + let limits_match = spec_resources.and_then(|r| r.limits.as_ref()) + == status_resources.and_then(|r| r.limits.as_ref()); + + requests_match && limits_match + }) + }); + } + false + } + } + /// An await condition for `Job` that returns `true` once it is completed #[must_use] pub fn is_job_completed() -> impl Condition { @@ -546,6 +586,273 @@ pub mod conditions { assert!(!is_pod_running().matches_object(None)) } + #[test] + /// pass when spec and status resources match (resize complete) + fn pod_resized_complete() { + use super::{Condition, is_pod_resized}; + + let pod = r#" + apiVersion: v1 + kind: Pod + metadata: + name: resize-demo + namespace: default + spec: + containers: + - name: app + image: alpine:3.23 + resources: + requests: + cpu: "200m" + memory: "256Mi" + limits: + cpu: "400m" + memory: "512Mi" + status: + phase: Running + containerStatuses: + - name: app + image: alpine:3.23 + resources: + requests: + cpu: "200m" + memory: "256Mi" + limits: + cpu: "400m" + memory: "512Mi" + "#; + + let p = serde_yaml::from_str(pod).unwrap(); + assert!(is_pod_resized().matches_object(Some(&p))) + } + + #[test] + /// pass when containers have no resources defined + fn pod_resized_no_resources() { + use super::{Condition, is_pod_resized}; + + let pod = r#" + apiVersion: v1 + kind: Pod + metadata: + name: resize-demo + namespace: default + spec: + containers: + - name: app + image: alpine:3.23 + status: + phase: Running + containerStatuses: + - name: app + image: alpine:3.23 + "#; + + let p = serde_yaml::from_str(pod).unwrap(); + assert!(is_pod_resized().matches_object(Some(&p))) + } + + #[test] + /// fail if pod does not exist + fn pod_resized_missing() { + use super::{Condition, is_pod_resized}; + + assert!(!is_pod_resized().matches_object(None)) + } + + #[test] + /// fail when spec resources differ from status (resize in progress) + fn pod_resize_in_progress() { + use super::{Condition, is_pod_resized}; + + let pod = r#" + apiVersion: v1 + kind: Pod + metadata: + name: resize-demo + namespace: default + spec: + containers: + - name: app + image: alpine:3.23 + resources: + requests: + cpu: "200m" + memory: "256Mi" + limits: + cpu: "400m" + memory: "512Mi" + status: + phase: Running + containerStatuses: + - name: app + image: alpine:3.23 + resources: + requests: + cpu: "100m" + memory: "128Mi" + limits: + cpu: "200m" + memory: "256Mi" + "#; + + let p = serde_yaml::from_str(pod).unwrap(); + assert!(!is_pod_resized().matches_object(Some(&p))) + } + + #[test] + /// fail when only limits differ (partial resize) + fn pod_resize_limits_differ() { + use super::{Condition, is_pod_resized}; + + let pod = r#" + apiVersion: v1 + kind: Pod + metadata: + name: resize-demo + namespace: default + spec: + containers: + - name: app + image: alpine:3.23 + resources: + requests: + cpu: "200m" + limits: + cpu: "400m" + status: + phase: Running + containerStatuses: + - name: app + image: alpine:3.23 + resources: + requests: + cpu: "200m" + limits: + cpu: "200m" + "#; + + let p = serde_yaml::from_str(pod).unwrap(); + assert!(!is_pod_resized().matches_object(Some(&p))) + } + + #[test] + /// fail when container status is missing for a spec container + fn pod_resize_missing_container_status() { + use super::{Condition, is_pod_resized}; + + let pod = r#" + apiVersion: v1 + kind: Pod + metadata: + name: resize-demo + namespace: default + spec: + containers: + - name: app + image: alpine:3.23 + resources: + requests: + cpu: "200m" + status: + phase: Running + containerStatuses: + - name: other + image: alpine:3.23 + "#; + + let p = serde_yaml::from_str(pod).unwrap(); + assert!(!is_pod_resized().matches_object(Some(&p))) + } + + #[test] + /// pass when multiple containers all have matching resources + fn pod_resize_multiple_containers_complete() { + use super::{Condition, is_pod_resized}; + + let pod = r#" + apiVersion: v1 + kind: Pod + metadata: + name: resize-demo + namespace: default + spec: + containers: + - name: app + image: alpine:3.23 + resources: + requests: + cpu: "200m" + limits: + cpu: "400m" + - name: sidecar + image: busybox + resources: + requests: + memory: "64Mi" + status: + phase: Running + containerStatuses: + - name: app + image: alpine:3.23 + resources: + requests: + cpu: "200m" + limits: + cpu: "400m" + - name: sidecar + image: busybox + resources: + requests: + memory: "64Mi" + "#; + + let p = serde_yaml::from_str(pod).unwrap(); + assert!(is_pod_resized().matches_object(Some(&p))) + } + + #[test] + /// fail when one of multiple containers has mismatched resources + fn pod_resize_multiple_containers_partial() { + use super::{Condition, is_pod_resized}; + + let pod = r#" + apiVersion: v1 + kind: Pod + metadata: + name: resize-demo + namespace: default + spec: + containers: + - name: app + image: alpine:3.23 + resources: + requests: + cpu: "200m" + - name: sidecar + image: busybox + resources: + requests: + memory: "128Mi" + status: + phase: Running + containerStatuses: + - name: app + image: alpine:3.23 + resources: + requests: + cpu: "200m" + - name: sidecar + image: busybox + resources: + requests: + memory: "64Mi" + "#; + + let p = serde_yaml::from_str(pod).unwrap(); + assert!(!is_pod_resized().matches_object(Some(&p))) + } + #[test] /// pass if job completed fn job_completed_ok() {