diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index 4db452ad5..a84fb1cb5 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -56,6 +56,9 @@ schemars.workspace = true tracing-subscriber.workspace = true k8s-openapi= { workspace = true, features = ["latest"] } dhat.workspace = true +http.workspace = true +http-body-util.workspace = true +tower-test.workspace = true [[bench]] name = "memory" diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 933b7ca44..8c9a19c61 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -118,6 +118,15 @@ impl Action { pub const fn await_change() -> Self { Self { requeue_after: None } } + + /// Whether this action requests a requeue (rather than awaiting a change). + /// + /// Used by the [`finalizer`](crate::finalizer) helper to tell apart a completed + /// cleanup ([`Action::await_change`]) from one that is still in progress + /// ([`Action::requeue`]). + pub(crate) const fn wants_requeue(&self) -> bool { + self.requeue_after.is_some() + } } /// Helper for building custom trigger filters, see the implementations of [`trigger_self`] and [`trigger_owners`] for some examples. diff --git a/kube-runtime/src/finalizer.rs b/kube-runtime/src/finalizer.rs index 42b883f70..6744cbfe8 100644 --- a/kube-runtime/src/finalizer.rs +++ b/kube-runtime/src/finalizer.rs @@ -96,6 +96,15 @@ impl FinalizerState { /// 12. `finalizer` removes `finalizer_name` from [`ObjectMeta::finalizers`] /// 13. Kubernetes sees that all [`ObjectMeta::finalizers`] are gone and finally deletes the object /// +/// # Long-running cleanup +/// +/// [`Event::Cleanup`] may need to wait for a background process (such as draining or an +/// external deletion) that takes a long time to finish. Rather than blocking the +/// reconciliation slot, or returning an `Err` (which is requeued immediately), cleanup can +/// return [`Action::requeue`] to signal that it is still in progress: the finalizer is *kept* +/// and the controller re-runs [`Event::Cleanup`] after the requested delay. The finalizer is +/// only removed (allowing deletion to proceed) once cleanup returns [`Action::await_change`]. +/// /// # Guarantees /// /// If [`Event::Apply`] is ever started then [`Event::Cleanup`] must succeed before the Kubernetes object deletion completes. @@ -152,6 +161,13 @@ where .await // Short-circuit, so that we keep the finalizer if cleanup fails .map_err(Error::CleanupFailed)?; + // A requeue means the cleanup is still in progress (e.g. waiting on a + // background process), so we keep the finalizer and let the controller + // re-run `Cleanup` later. Only `Action::await_change` signals that cleanup + // has finished and the finalizer can be removed. + if action.wants_requeue() { + return Ok(action); + } // Cleanup was successful, remove the finalizer so that deletion can continue let finalizer_path = format!("/metadata/finalizers/{finalizer_i}"); api.patch::( @@ -244,6 +260,11 @@ pub enum Event { Apply(Arc), /// The object is being deleted, and the reconciler should remove all resources that it owns. /// + /// Return [`Action::await_change`] once cleanup is complete to let the finalizer be removed and + /// deletion proceed. If cleanup is still in progress (for example, waiting on a long-running + /// background process), return [`Action::requeue`] instead: the finalizer is kept and `Cleanup` + /// is re-run after the requested delay, without blocking the reconciliation slot. + /// /// This must be idempotent, since it may be recalled if, for example (this list is non-exhaustive): /// /// - The controller is restarted while the deletion is in progress @@ -252,3 +273,281 @@ pub enum Event { /// - The grinch's heart grows a size or two Cleanup(Arc), } + +#[cfg(test)] +mod tests { + use super::{Event, finalizer}; + use crate::controller::Action; + use http::{Request, Response}; + use http_body_util::BodyExt; + use k8s_openapi::api::core::v1::ConfigMap; + use kube_client::{Api, Client, client::Body}; + use std::{convert::Infallible, pin::pin, sync::Arc, time::Duration}; + use tower_test::mock; + + const FINALIZER: &str = "test/finalizer"; + + /// Collect a request's body and parse it as the JSON patch the finalizer sent. + async fn json_patch_body(request: Request) -> serde_json::Value { + let bytes = request.into_body().collect().await.unwrap().to_bytes(); + serde_json::from_slice(&bytes).unwrap() + } + + /// A `ConfigMap` that is being deleted and still carries our finalizer. + fn deleting_cm() -> ConfigMap { + serde_json::from_value(serde_json::json!({ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": { + "name": "test", + "namespace": "default", + "deletionTimestamp": "2024-01-01T00:00:00Z", + "finalizers": [FINALIZER], + }, + })) + .unwrap() + } + + /// A live `ConfigMap` that already carries our finalizer (the steady-state Apply case). + fn applied_cm() -> ConfigMap { + serde_json::from_value(serde_json::json!({ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": { + "name": "test", + "namespace": "default", + "finalizers": [FINALIZER], + }, + })) + .unwrap() + } + + // With no finalizer present on a live object, the helper patches one in (and does not run + // reconcile yet, since the patch triggers a fresh reconciliation), returning `await_change`. + #[tokio::test] + async fn missing_finalizer_is_added() { + let (mock_service, handle) = mock::pair::, Response>(); + let api: Api = Api::default_namespaced(Client::new(mock_service, "default")); + + let cm: ConfigMap = serde_json::from_value(serde_json::json!({ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": { "name": "test", "namespace": "default" }, + })) + .unwrap(); + + let server = tokio::spawn(async move { + let mut handle = pin!(handle); + let (request, send) = handle + .next_request() + .await + .expect("finalizer should be patched in"); + assert_eq!(request.method(), http::Method::PATCH); + assert_eq!( + request.uri().to_string(), + "/api/v1/namespaces/default/configmaps/test?" + ); + assert_eq!( + request.headers().get(http::header::CONTENT_TYPE).unwrap(), + "application/json-patch+json" + ); + // Respond with the object now carrying the finalizer + let patched: ConfigMap = serde_json::from_value(serde_json::json!({ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": { "name": "test", "namespace": "default", "finalizers": [FINALIZER] }, + })) + .unwrap(); + send.send_response(Response::new(Body::from(serde_json::to_vec(&patched).unwrap()))); + }); + + let action = finalizer(&api, FINALIZER, Arc::new(cm), |_event| async move { + panic!("reconcile must not run before the finalizer is added"); + #[allow(unreachable_code)] + Ok::<_, Infallible>(Action::await_change()) + }) + .await + .expect("adding the finalizer should succeed"); + + assert_eq!(action, Action::await_change()); + server.await.unwrap(); + } + + // With the finalizer already present and the object not being deleted, the finalizer runs + // `Apply` and passes its action straight through, without touching the API. The mock handle + // is dropped, so any request would error and fail the test. + #[tokio::test] + async fn apply_runs_reconcile_and_passes_action_through() { + let (mock_service, handle) = mock::pair::, Response>(); + drop(handle); // any API call now errors + let api: Api = Api::default_namespaced(Client::new(mock_service, "default")); + + let action = finalizer(&api, FINALIZER, Arc::new(applied_cm()), |event| async move { + assert!(matches!(event, Event::Apply(_)), "expected an Apply event"); + Ok::<_, Infallible>(Action::requeue(Duration::from_secs(60))) + }) + .await + .expect("finalizer should not have called the API"); + + assert_eq!(action, Action::requeue(Duration::from_secs(60))); + } + + // A `Cleanup` that returns `Action::requeue` is "still in progress": the finalizer must be + // kept and the API must not be touched. The mock handle is dropped, so any request the + // finalizer makes would error and fail the test. + #[tokio::test] + async fn cleanup_requeue_keeps_finalizer_without_patching() { + let (mock_service, handle) = mock::pair::, Response>(); + drop(handle); // any API call now errors + let api: Api = Api::default_namespaced(Client::new(mock_service, "default")); + + let action = finalizer(&api, FINALIZER, Arc::new(deleting_cm()), |event| async move { + assert!(matches!(event, Event::Cleanup(_)), "expected a Cleanup event"); + Ok::<_, Infallible>(Action::requeue(Duration::from_secs(300))) + }) + .await + .expect("finalizer should not have called the API"); + + assert_eq!(action, Action::requeue(Duration::from_secs(300))); + } + + // A `Cleanup` that returns `Action::await_change` is complete: the finalizer issues a JSON + // patch removing the finalizer entry, then returns the action. + #[tokio::test] + async fn cleanup_await_change_removes_finalizer() { + let (mock_service, handle) = mock::pair::, Response>(); + let api: Api = Api::default_namespaced(Client::new(mock_service, "default")); + + let server = tokio::spawn(async move { + let mut handle = pin!(handle); + let (request, send) = handle.next_request().await.expect("finalizer should patch"); + assert_eq!(request.method(), http::Method::PATCH); + assert_eq!( + request.uri().to_string(), + "/api/v1/namespaces/default/configmaps/test?" + ); + assert_eq!( + request.headers().get(http::header::CONTENT_TYPE).unwrap(), + "application/json-patch+json" + ); + // Respond with the (now finalizer-free) object + let patched: ConfigMap = serde_json::from_value(serde_json::json!({ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": { "name": "test", "namespace": "default" }, + })) + .unwrap(); + send.send_response(Response::new(Body::from(serde_json::to_vec(&patched).unwrap()))); + }); + + let action = finalizer(&api, FINALIZER, Arc::new(deleting_cm()), |event| async move { + assert!(matches!(event, Event::Cleanup(_)), "expected a Cleanup event"); + Ok::<_, Infallible>(Action::await_change()) + }) + .await + .expect("finalizer removal should succeed"); + + assert_eq!(action, Action::await_change()); + server.await.unwrap(); + } + + // Adding our finalizer to an object that already has others must append (not overwrite), + // guarded by a `test` op on the existing list so a concurrent change forces a retry. + #[tokio::test] + async fn finalizer_is_appended_to_existing() { + let (mock_service, handle) = mock::pair::, Response>(); + let api: Api = Api::default_namespaced(Client::new(mock_service, "default")); + + let cm: ConfigMap = serde_json::from_value(serde_json::json!({ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": { "name": "test", "namespace": "default", "finalizers": ["other/finalizer"] }, + })) + .unwrap(); + + let server = tokio::spawn(async move { + let mut handle = pin!(handle); + let (request, send) = handle + .next_request() + .await + .expect("finalizer should be patched in"); + assert_eq!(request.method(), http::Method::PATCH); + assert_eq!( + json_patch_body(request).await, + serde_json::json!([ + { "op": "test", "path": "/metadata/finalizers", "value": ["other/finalizer"] }, + { "op": "add", "path": "/metadata/finalizers/-", "value": FINALIZER }, + ]) + ); + let patched: ConfigMap = serde_json::from_value(serde_json::json!({ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": { "name": "test", "namespace": "default", "finalizers": ["other/finalizer", FINALIZER] }, + })) + .unwrap(); + send.send_response(Response::new(Body::from(serde_json::to_vec(&patched).unwrap()))); + }); + + let action = finalizer(&api, FINALIZER, Arc::new(cm), |_event| async move { + panic!("reconcile must not run before the finalizer is added"); + #[allow(unreachable_code)] + Ok::<_, Infallible>(Action::await_change()) + }) + .await + .expect("appending the finalizer should succeed"); + + assert_eq!(action, Action::await_change()); + server.await.unwrap(); + } + + // When cleanup completes and our finalizer sits among others, removal must target its exact + // index, guarded by a `test` op so we never remove someone else's finalizer. + #[tokio::test] + async fn cleanup_removes_finalizer_at_its_index() { + let (mock_service, handle) = mock::pair::, Response>(); + let api: Api = Api::default_namespaced(Client::new(mock_service, "default")); + + // Our finalizer is at index 1. + let cm: ConfigMap = serde_json::from_value(serde_json::json!({ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": { + "name": "test", + "namespace": "default", + "deletionTimestamp": "2024-01-01T00:00:00Z", + "finalizers": ["other/finalizer", FINALIZER], + }, + })) + .unwrap(); + + let server = tokio::spawn(async move { + let mut handle = pin!(handle); + let (request, send) = handle.next_request().await.expect("finalizer should be removed"); + assert_eq!(request.method(), http::Method::PATCH); + assert_eq!( + json_patch_body(request).await, + serde_json::json!([ + { "op": "test", "path": "/metadata/finalizers/1", "value": FINALIZER }, + { "op": "remove", "path": "/metadata/finalizers/1" }, + ]) + ); + let patched: ConfigMap = serde_json::from_value(serde_json::json!({ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": { "name": "test", "namespace": "default", "finalizers": ["other/finalizer"] }, + })) + .unwrap(); + send.send_response(Response::new(Body::from(serde_json::to_vec(&patched).unwrap()))); + }); + + let action = finalizer(&api, FINALIZER, Arc::new(cm), |event| async move { + assert!(matches!(event, Event::Cleanup(_)), "expected a Cleanup event"); + Ok::<_, Infallible>(Action::await_change()) + }) + .await + .expect("finalizer removal should succeed"); + + assert_eq!(action, Action::await_change()); + server.await.unwrap(); + } +}