From 7c0e0575e7eee832dc7ebefeb9a6b697ef3453e8 Mon Sep 17 00:00:00 2001 From: Harald Gutmann Date: Thu, 14 Aug 2025 17:49:12 +0200 Subject: [PATCH] feat(tests): enhance troubleshooting outputs log deployment status, replicaset status, pod status and logs or daemonset status, pod status and logs in case of a rollout timeout use kube instead of kubectl where feasible Signed-off-by: Harald Gutmann --- Cargo.lock | 96 ++++ tests-integration/Cargo.toml | 1 + .../src/infrastructure/kind_cluster.rs | 439 ++++++++++++++---- .../src/infrastructure/kustomize.rs | 59 +-- tests-integration/src/infrastructure/mod.rs | 23 +- 5 files changed, 470 insertions(+), 148 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 96bdc5ff..d642ee4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -45,6 +45,21 @@ version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anstream" version = "0.6.20" @@ -327,8 +342,11 @@ version = "0.4.41" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d" dependencies = [ + "android-tzdata", + "iana-time-zone", "num-traits", "serde", + "windows-link", ] [[package]] @@ -1060,6 +1078,30 @@ dependencies = [ "tracing", ] +[[package]] +name = "iana-time-zone" +version = "0.1.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0c919e5debc312ad217002b8048a17b7d83f80703865bbfcfebb0458b0b27d8" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "log", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "ident_case" version = "1.0.1" @@ -2186,6 +2228,7 @@ dependencies = [ name = "tests-integration" version = "0.3.0" dependencies = [ + "chrono", "k8s-openapi", "kube", "thiserror 2.0.14", @@ -2635,12 +2678,65 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "windows-core" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link", + "windows-result", + "windows-strings", +] + +[[package]] +name = "windows-implement" +version = "0.60.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-interface" +version = "0.59.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "windows-link" version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" +[[package]] +name = "windows-result" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-strings" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57" +dependencies = [ + "windows-link", +] + [[package]] name = "windows-sys" version = "0.52.0" diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index a7340cbd..efb66ea5 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -7,6 +7,7 @@ version.workspace = true publish = false [dependencies] +chrono = { workspace = true, features = ["clock"] } k8s-openapi= { workspace = true, features = ["v1_32"] } kube = { workspace = true, features = ["client", "rustls-tls", "ring"] } thiserror.workspace = true diff --git a/tests-integration/src/infrastructure/kind_cluster.rs b/tests-integration/src/infrastructure/kind_cluster.rs index 03f7eea2..f3e198b6 100644 --- a/tests-integration/src/infrastructure/kind_cluster.rs +++ b/tests-integration/src/infrastructure/kind_cluster.rs @@ -14,17 +14,26 @@ See the License for the specific language governing permissions and limitations under the License. */ -use std::time::Duration; +use std::collections::BTreeMap; +use std::ops::Add; +use std::time::{Duration, Instant}; +use chrono::Utc; +use k8s_openapi::api::apps::v1::{ + DaemonSet, DaemonSetSpec, Deployment, DeploymentSpec, ReplicaSet, +}; +use k8s_openapi::api::core::v1::{Container, Pod, PodSpec, PodTemplateSpec}; +use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; +use k8s_openapi::chrono; +use kube::api::{ListParams, LogParams, Patch, PatchParams}; use kube::config::KubeConfigOptions; -use kube::{Client, Config}; +use kube::core::Selector; +use kube::{Api, Client, Config}; use thiserror::Error as ThisError; +use tokio::time::sleep; use tracing::{error, info}; -use crate::Result; -use crate::infrastructure::{ - AsyncCommand, AsyncCommandError, ContainerState, Workload, WorkloadImageTag, -}; +use crate::infrastructure::{AsyncCommand, AsyncCommandError, Workload, WorkloadImageTag}; /// Single-node kind cluster. #[derive(Clone, Debug)] @@ -38,16 +47,24 @@ pub struct KindCluster { pub enum KindClusterError { #[error("{0}: {1}")] Execution(String, AsyncCommandError), - #[error("{0} container state {1:?}")] - ContainerState(String, ContainerState), + #[error("{0}")] + Rollout(String), + #[error("kube client error: {0}")] + Client(#[from] Box), #[error("Failed to create client {1} for k8s context {0:?}")] - Client(String, String), - #[error("Could not determine container id for ter {0}")] - NotFound(String), + Config(String, String), #[error("loading image: {1}:{2} to cluster {0} failed: {3}")] LoadImage(String, String, String, String), } +impl From for KindClusterError { + fn from(value: kube::Error) -> Self { + Box::new(value).into() + } +} + +pub type Result = std::result::Result; + impl KindCluster { /// create a new cluster pub fn new>(name: T) -> Result { @@ -75,10 +92,10 @@ impl KindCluster { }; let cfg = Config::from_kubeconfig(&kube_config) .await - .map_err(|e| KindClusterError::Client(self.k8s_context(), e.to_string()))?; + .map_err(|e| KindClusterError::Config(self.k8s_context(), e.to_string()))?; let client = Client::try_from(cfg) - .map_err(|e| KindClusterError::Client(self.k8s_context(), e.to_string()))?; + .map_err(|e| KindClusterError::Config(self.k8s_context(), e.to_string()))?; Ok(client) } @@ -93,7 +110,6 @@ impl KindCluster { format!("Failed to create kind cluster {}", self.name), e, ) - .into() }) } @@ -107,7 +123,6 @@ impl KindCluster { format!("Failed to create kind cluster {}", self.name), e, ) - .into() }) } @@ -134,7 +149,6 @@ impl KindCluster { tag.to_string(), e.to_string(), ) - .into() }) } @@ -146,59 +160,66 @@ impl KindCluster { workload: T, wait_status: Option, ) -> Result<()> { + let client = self.k8s_client().await?; + let pp = PatchParams::apply("blixt-integration-tests"); + let workload = workload.as_ref(); - let k8s_ctx = self.k8s_context(); + let (namespace, name) = workload.id.namespace_name(); - let (workload_type, namespace, name) = workload.workload_namespace_name(); + match &workload.id { + Workload::DaemonSet(_) => { + let daemonset_api = Api::::namespaced(client.clone(), namespace); + let daemonset = daemonset_api.get(name).await?; - // update deployment image references in case specified - if let Some((image, tag)) = workload.image_tag() { - info!( - "Updating image {image} with tag {tag} for rollout {}.", - workload.id - ); - AsyncCommand::new( - "kubectl", - &[ - format!("--context={k8s_ctx}").as_str(), - "set", - "image", - "-n", - namespace, - format!("{workload_type}/{name}").as_str(), - format!("*={image}:{tag}").as_str(), - ], - ) - .run() - .await - .map_err(|e| { - KindClusterError::Execution( - format!("Failed to set image for {namespace} {workload_type}/{name} failed"), - e, - ) - })?; - } + let Some(spec) = daemonset.spec.unwrap_or_default().template.spec else { + return Err(KindClusterError::Rollout(format!( + "{} does not contain .spec.template.spec", + workload.id + ))); + }; - info!("Restarting rollout {}.", workload.id); - AsyncCommand::new( - "kubectl", - &[ - format!("--context={k8s_ctx}").as_str(), - "rollout", - "restart", - "-n", - namespace, - format!("{workload_type}/{name}").as_str(), - ], - ) - .run() - .await - .map_err(|e| { - KindClusterError::Execution( - format!("Rollout restart for {namespace} {workload_type}/{name} failed"), - e, - ) - })?; + let patch = DaemonSet { + metadata: ObjectMeta { + name: Some(name.to_string()), + ..Default::default() + }, + spec: Some(DaemonSetSpec { + template: Self::container_image_update_rollout_patch(spec, workload), + ..Default::default() + }), + ..Default::default() + }; + + let patch = Patch::Strategic(&patch); + daemonset_api.patch(name, &pp, &patch).await?; + } + Workload::Deployment(_) => { + let deployment_api = Api::::namespaced(client.clone(), namespace); + let deployment = deployment_api.get(name).await?; + + let Some(spec) = deployment.spec.unwrap_or_default().template.spec else { + return Err(KindClusterError::Rollout(format!( + "{} does not contain .spec.template.spec", + workload.id + ))); + }; + + let patch = Deployment { + metadata: ObjectMeta { + name: Some(name.to_string()), + ..Default::default() + }, + spec: Some(DeploymentSpec { + template: Self::container_image_update_rollout_patch(spec, workload), + ..Default::default() + }), + ..Default::default() + }; + + let patch = Patch::Strategic(&patch); + deployment_api.patch(name, &pp, &patch).await?; + } + } if let Some(wait_status) = wait_status { self.rollout_status(&workload.id, wait_status).await?; @@ -207,6 +228,64 @@ impl KindCluster { Ok(()) } + fn container_image_update_rollout_patch( + spec: PodSpec, + workload: &WorkloadImageTag, + ) -> PodTemplateSpec { + let mut container_patches = vec![]; + + // update deployment image references in case specified + if let Some(image_tag) = &workload.image_tag { + for container in spec.containers { + if let Some(container_image) = &container.image { + let update_image = format!("{}:{}", image_tag.image, image_tag.tag); + if container_image != &update_image { + container_patches.push(Container { + image: Some(update_image), + name: container.name.clone(), + ..Default::default() + }) + }; + } + } + } + + let mut annotations = BTreeMap::new(); + annotations.insert( + "blixt.integration.tests/restartedAt".to_string(), + Utc::now().to_rfc3339(), + ); + + if container_patches.is_empty() { + info!("Requesting rollout for {}.", workload.id); + // only update annotations to ensure rollout is triggered + PodTemplateSpec { + metadata: Some(ObjectMeta { + annotations: Some(annotations), + ..Default::default() + }), + ..Default::default() + } + } else { + if let Some(image_tag) = &workload.image_tag { + info!( + "Updating image for {} to {}:{}", + workload.id, image_tag.image, image_tag.tag + ); + } + PodTemplateSpec { + metadata: Some(ObjectMeta { + annotations: Some(annotations), + ..Default::default() + }), + spec: Some(PodSpec { + containers: container_patches, + ..Default::default() + }), + } + } + } + /// In case wait_status is `None` the rollouts will not wait for success /// In case wait_status is `Some(Duration)` the rollouts are waiting for success with /// the duration as timeout in seconds per rollout @@ -228,36 +307,208 @@ impl KindCluster { workload: T, timeout_secs: Duration, ) -> Result<()> { - let k8s_ctx = self.k8s_context(); - let timeout = timeout_secs.as_secs().to_string(); + let client = self.k8s_client().await?; let workload = workload.as_ref(); + let (namespace, name) = workload.namespace_name(); - info!( - "Waiting for rollout {} to complete. Timeout: {}s", - workload, timeout - ); - let (workload_type, namespace, name) = workload.workload_namespace_name(); - AsyncCommand::new( - "kubectl", - &[ - format!("--context={k8s_ctx}").as_str(), - "rollout", - "status", - "-n", - namespace, - format!("{workload_type}/{name}").as_str(), - "--timeout", - format!("{timeout}s").as_str(), - ], - ) - .run() - .await - .map_err(|e| { - KindClusterError::Execution( - format!("Rollout status for {namespace} {workload_type}/{name} failed"), - e, - ) - .into() - }) + let start_time = Instant::now(); + 'watch: while start_time.elapsed() <= timeout_secs.add(Duration::from_secs(1)) { + // wait first to avoid potentially getting old rollout details + sleep(Duration::from_secs(1)).await; + + let rollout_success = match workload { + Workload::DaemonSet(_) => { + Self::rollout_status_daemonset( + client.clone(), + namespace, + name, + &start_time, + &timeout_secs, + ) + .await? + } + Workload::Deployment(_) => { + Self::rollout_status_deployment( + client.clone(), + namespace, + name, + &start_time, + &timeout_secs, + ) + .await? + } + }; + + if rollout_success { + info!("Rollout for {workload} was successful."); + break 'watch; + } else { + info!( + "Waiting for {workload} rollout to complete (elapsed: {:?}s, timeout: {timeout_secs:?}).", + start_time.elapsed().as_secs() + ); + } + } + + Ok(()) + } + + async fn rollout_status_deployment( + client: Client, + namespace: &str, + name: &str, + start_time: &Instant, + timeout: &Duration, + ) -> Result { + let deployment_api = Api::::namespaced(client, namespace); + let deployment = deployment_api.get(name).await?; + let Some(status) = deployment.status.clone() else { + return Ok(false); + }; + let Some(deployment_revision) = deployment + .metadata + .annotations + .clone() + .unwrap_or_default() + .remove("deployment.kubernetes.io/revision") + else { + return Ok(false); + }; + + // locate corresponding ReplicaSet + let lp = if let Some(labels) = deployment.metadata.labels { + ListParams::default().labels_from(&Selector::from_iter(labels)) + } else { + ListParams::default() + }; + + let replicaset_api = Api::::namespaced(deployment_api.into_client(), namespace); + let replicasets = replicaset_api.list(&lp).await?; + + let Some(replicaset) = replicasets.into_iter().find(|replicaset| { + let replicaset_revision = replicaset + .metadata + .annotations + .clone() + .unwrap_or_default() + .remove("deployment.kubernetes.io/revision") + .unwrap_or_default(); + replicaset_revision == deployment_revision + }) else { + return Ok(false); + }; + + let pod_api = Api::::namespaced(replicaset_api.into_client(), namespace); + let replicaset_labels = replicaset.metadata.labels.clone().unwrap_or_default(); + // ReplicaSet labels contain pod-template-hash to identify corresponding pods + let lp = ListParams::default().labels_from(&Selector::from_iter(replicaset_labels)); + let pods = pod_api.list(&lp).await?.items; + + if &start_time.elapsed() >= timeout { + error!("Deployment {namespace}/{name} rollout timed out.",); + error!("{:?}", status); + error!("{:?}", replicaset.status); + + for pod in pods { + Self::error_pod_details(&pod_api, pod).await?; + } + + Err(KindClusterError::Rollout(format!( + "Deployment {namespace}/{name} rollout timed out." + ))) + } else if pods.is_empty() { + Ok(false) + } else { + let pods_running = pods.iter().all(|p| { + p.status + .clone() + .unwrap_or_default() + .phase + .unwrap_or_default() + == "Running" + }); + let deployment_ready = status.ready_replicas >= status.replicas; + Ok(deployment_ready && pods_running) + } + } + + async fn rollout_status_daemonset( + client: Client, + namespace: &str, + name: &str, + start_time: &Instant, + timeout: &Duration, + ) -> Result { + let daemonset_api = Api::::namespaced(client, namespace); + let daemonset = daemonset_api.get(name).await?; + let Some(status) = daemonset.status.clone() else { + return Ok(false); + }; + + let daemonset_generation = daemonset + .metadata + .generation + .unwrap_or_default() + .to_string(); + + let lp = ListParams::default().labels_from(&Selector::from_iter( + daemonset.metadata.labels.unwrap_or_default(), + )); + let pod_api = Api::::namespaced(daemonset_api.into_client(), namespace); + let pods = pod_api.list(&lp).await?; + + let pods = pods + .items + .into_iter() + .filter(|p| { + let mut labels = p.metadata.labels.clone().unwrap_or_default(); + let pod_template_generation = + labels.remove("pod-template-generation").unwrap_or_default(); + pod_template_generation == daemonset_generation + }) + .collect::>(); + + if &start_time.elapsed() >= timeout { + error!("DaemonSet {namespace}/{name} rollout timed out.",); + error!("{:?}", status); + + for pod in pods { + Self::error_pod_details(&pod_api, pod).await?; + } + + Err(KindClusterError::Rollout(format!( + "DaemonSet {namespace}/{name} rollout timed out." + ))) + } else if pods.is_empty() { + Ok(false) + } else { + let pods_running = pods.iter().all(|p| { + p.status + .clone() + .unwrap_or_default() + .phase + .unwrap_or_default() + == "Running" + }); + let daemonset_ready = status.number_ready >= status.desired_number_scheduled; + Ok(daemonset_ready && pods_running) + } + } + + /// log pod status and pod logs + async fn error_pod_details(pod_api: &Api, pod: Pod) -> Result<()> { + if let Some(status) = pod.status { + error!("{:?}", status); + } + if let Some(name) = pod.metadata.name { + let lp = LogParams { + tail_lines: Some(1024), + ..Default::default() + }; + + let pod_logs = pod_api.logs(name.as_str(), &lp).await?; + error!("{pod_logs}") + } + Ok(()) } } diff --git a/tests-integration/src/infrastructure/kustomize.rs b/tests-integration/src/infrastructure/kustomize.rs index b7da82fe..00247586 100644 --- a/tests-integration/src/infrastructure/kustomize.rs +++ b/tests-integration/src/infrastructure/kustomize.rs @@ -44,7 +44,6 @@ pub struct KustomizeDeployments { enum KustomizeKind { Directory(PathBuf), - File(PathBuf), Https(String), } @@ -70,25 +69,15 @@ impl KustomizeDeployments { let k8s_ctx = self.cluster.k8s_context(); for deployment in &self.kustomizations { let inner = deployment.inner(); - match deployment.needs_k() { - true => AsyncCommand::new( - "kubectl", - &[ - format!("--context={k8s_ctx}").as_str(), - "apply", - "-k", - inner.as_str(), - ], - ), - false => AsyncCommand::new( - "kubectl", - &[ - format!("--context={k8s_ctx}").as_str(), - "apply", - inner.as_str(), - ], - ), - } + AsyncCommand::new( + "kubectl", + &[ + format!("--context={k8s_ctx}").as_str(), + "apply", + "--kustomize", + inner.as_str(), + ], + ) .run() .await .map_err(KustomizeError::Apply)?; @@ -99,7 +88,7 @@ impl KustomizeDeployments { } impl KustomizeKind { - async fn try_from>(kustomization: D) -> Result { + async fn try_from>(kustomization: K) -> Result { let kustomization = kustomization.as_ref(); let kind = match kustomization { @@ -110,10 +99,10 @@ impl KustomizeKind { let path = kustomization.to_string(); let fs_path = Path::new(&path); - let exists = fs_path + if !fs_path .try_exists() - .map_err(|e| KustomizeError::InvalidPath(path.to_string(), e.to_string()))?; - if !exists { + .map_err(|e| KustomizeError::InvalidPath(path.to_string(), e.to_string()))? + { return Err(KustomizeError::InvalidPath( path.to_string(), "does not exist".to_string(), @@ -121,10 +110,15 @@ impl KustomizeKind { .into()); } - match fs_path.is_dir() { - true => KustomizeKind::Directory(fs_path.to_path_buf()), - false => KustomizeKind::File(fs_path.to_path_buf()), + if !fs_path.is_dir() { + return Err(KustomizeError::InvalidPath( + path.to_string(), + "is a file, not a directory".to_string(), + ) + .into()); } + + KustomizeKind::Directory(fs_path.to_path_buf()) } }; @@ -133,7 +127,7 @@ impl KustomizeKind { async fn validate(self) -> Result { match &self { - KustomizeKind::Directory(_) | KustomizeKind::File(_) => { + KustomizeKind::Directory(_) => { let inner = self.inner(); AsyncCommand::new("kubectl", &["kustomize", inner.as_str()]) .run() @@ -147,18 +141,9 @@ impl KustomizeKind { Ok(self) } - fn needs_k(&self) -> bool { - match self { - KustomizeKind::Directory(_) => true, - KustomizeKind::File(_) => false, - KustomizeKind::Https(_) => true, - } - } - fn inner(&self) -> String { match self { KustomizeKind::Directory(d) => d.as_os_str().to_string_lossy().to_string(), - KustomizeKind::File(d) => d.as_os_str().to_string_lossy().to_string(), KustomizeKind::Https(d) => d.clone(), } } diff --git a/tests-integration/src/infrastructure/mod.rs b/tests-integration/src/infrastructure/mod.rs index 2251aefe..378a9bfd 100644 --- a/tests-integration/src/infrastructure/mod.rs +++ b/tests-integration/src/infrastructure/mod.rs @@ -57,7 +57,7 @@ pub enum Workload { /// Fully qualified image name including tag. #[allow(missing_docs)] -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ImageTag { /// image fully qualified name pub image: String, @@ -77,22 +77,11 @@ pub struct WorkloadImageTag { pub image_tag: Option, } -impl WorkloadImageTag { - fn image_tag(&self) -> Option<(&str, &str)> { - self.image_tag - .as_ref() - .map(|it| (it.image.as_str(), it.tag.as_str())) - } - fn workload_namespace_name(&self) -> (&str, &str, &str) { - self.id.workload_namespace_name() - } -} - impl Workload { - fn workload_namespace_name(&self) -> (&str, &str, &str) { + fn namespace_name(&self) -> (&str, &str) { match &self { - Workload::DaemonSet(id) => ("daemonset", id.namespace.as_str(), id.name.as_str()), - Workload::Deployment(id) => ("deployment", id.namespace.as_str(), id.name.as_str()), + Workload::DaemonSet(id) => (id.namespace.as_str(), id.name.as_str()), + Workload::Deployment(id) => (id.namespace.as_str(), id.name.as_str()), } } } @@ -153,11 +142,11 @@ impl Display for Workload { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let id = match self { Workload::DaemonSet(id) => { - f.write_str("daemonset")?; + f.write_str("DaemonSet")?; id } Workload::Deployment(id) => { - f.write_str("deployment")?; + f.write_str("Deployment")?; id } };