Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ schemars.workspace = true
tracing-subscriber.workspace = true
k8s-openapi= { workspace = true, features = ["latest"] }
dhat.workspace = true
http.workspace = true
http-body-util.workspace = true
tower-test.workspace = true

[[bench]]
name = "memory"
Expand Down
9 changes: 9 additions & 0 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@ impl Action {
pub const fn await_change() -> Self {
Self { requeue_after: None }
}

/// Whether this action requests a requeue (rather than awaiting a change).
///
/// Used by the [`finalizer`](crate::finalizer) helper to tell apart a completed
/// cleanup ([`Action::await_change`]) from one that is still in progress
/// ([`Action::requeue`]).
pub(crate) const fn wants_requeue(&self) -> bool {
self.requeue_after.is_some()
}
}

/// Helper for building custom trigger filters, see the implementations of [`trigger_self`] and [`trigger_owners`] for some examples.
Expand Down
299 changes: 299 additions & 0 deletions kube-runtime/src/finalizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ impl FinalizerState {
/// 12. `finalizer` removes `finalizer_name` from [`ObjectMeta::finalizers`]
/// 13. Kubernetes sees that all [`ObjectMeta::finalizers`] are gone and finally deletes the object
///
/// # Long-running cleanup
///
/// [`Event::Cleanup`] may need to wait for a background process (such as draining or an
/// external deletion) that takes a long time to finish. Rather than blocking the
/// reconciliation slot, or returning an `Err` (which is requeued immediately), cleanup can
/// return [`Action::requeue`] to signal that it is still in progress: the finalizer is *kept*
/// and the controller re-runs [`Event::Cleanup`] after the requested delay. The finalizer is
/// only removed (allowing deletion to proceed) once cleanup returns [`Action::await_change`].
///
/// # Guarantees
///
/// If [`Event::Apply`] is ever started then [`Event::Cleanup`] must succeed before the Kubernetes object deletion completes.
Expand Down Expand Up @@ -152,6 +161,13 @@ where
.await
// Short-circuit, so that we keep the finalizer if cleanup fails
.map_err(Error::CleanupFailed)?;
// A requeue means the cleanup is still in progress (e.g. waiting on a
// background process), so we keep the finalizer and let the controller
// re-run `Cleanup` later. Only `Action::await_change` signals that cleanup
// has finished and the finalizer can be removed.
Comment thread
clux marked this conversation as resolved.
if action.wants_requeue() {
return Ok(action);
}
Comment on lines +164 to +170

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only scary part here is if people are using a blanket Action::requeue(5 minutes) in their reconcilers.

AFAIKT this used to be a common pattern for controllers to have some safety against missed events, and if i am reading this correctly, it would mean we never actually cleanup the finalizer. We would have to communicate this in the release just in case.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

searched around a bit and I could only find one controller getting this wrong

what i instead find, is more users doing Action::requeue inside the Event::Cleanup stage - which does not prevent them from having the object deleted before the requeue is started. See e.g.;

as such, despite the small risk, i now think this is a net harm-reduction change because it allows the majority of these requeue requesters to work better.

// Cleanup was successful, remove the finalizer so that deletion can continue
let finalizer_path = format!("/metadata/finalizers/{finalizer_i}");
api.patch::<K>(
Expand Down Expand Up @@ -244,6 +260,11 @@ pub enum Event<K> {
Apply(Arc<K>),
/// The object is being deleted, and the reconciler should remove all resources that it owns.
///
/// Return [`Action::await_change`] once cleanup is complete to let the finalizer be removed and
/// deletion proceed. If cleanup is still in progress (for example, waiting on a long-running
/// background process), return [`Action::requeue`] instead: the finalizer is kept and `Cleanup`
/// is re-run after the requested delay, without blocking the reconciliation slot.
///
/// This must be idempotent, since it may be recalled if, for example (this list is non-exhaustive):
///
/// - The controller is restarted while the deletion is in progress
Expand All @@ -252,3 +273,281 @@ pub enum Event<K> {
/// - The grinch's heart grows a size or two
Cleanup(Arc<K>),
}

#[cfg(test)]
mod tests {
use super::{Event, finalizer};
use crate::controller::Action;
use http::{Request, Response};
use http_body_util::BodyExt;
use k8s_openapi::api::core::v1::ConfigMap;
use kube_client::{Api, Client, client::Body};
use std::{convert::Infallible, pin::pin, sync::Arc, time::Duration};
use tower_test::mock;

const FINALIZER: &str = "test/finalizer";

/// Collect a request's body and parse it as the JSON patch the finalizer sent.
async fn json_patch_body(request: Request<Body>) -> serde_json::Value {
let bytes = request.into_body().collect().await.unwrap().to_bytes();
serde_json::from_slice(&bytes).unwrap()
}

/// A `ConfigMap` that is being deleted and still carries our finalizer.
fn deleting_cm() -> ConfigMap {
serde_json::from_value(serde_json::json!({
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"name": "test",
"namespace": "default",
"deletionTimestamp": "2024-01-01T00:00:00Z",
"finalizers": [FINALIZER],
},
}))
.unwrap()
}

/// A live `ConfigMap` that already carries our finalizer (the steady-state Apply case).
fn applied_cm() -> ConfigMap {
serde_json::from_value(serde_json::json!({
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"name": "test",
"namespace": "default",
"finalizers": [FINALIZER],
},
}))
.unwrap()
}

// With no finalizer present on a live object, the helper patches one in (and does not run
// reconcile yet, since the patch triggers a fresh reconciliation), returning `await_change`.
#[tokio::test]
async fn missing_finalizer_is_added() {
let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
let api: Api<ConfigMap> = Api::default_namespaced(Client::new(mock_service, "default"));

let cm: ConfigMap = serde_json::from_value(serde_json::json!({
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": { "name": "test", "namespace": "default" },
}))
.unwrap();

let server = tokio::spawn(async move {
let mut handle = pin!(handle);
let (request, send) = handle
.next_request()
.await
.expect("finalizer should be patched in");
assert_eq!(request.method(), http::Method::PATCH);
assert_eq!(
request.uri().to_string(),
"/api/v1/namespaces/default/configmaps/test?"
);
assert_eq!(
request.headers().get(http::header::CONTENT_TYPE).unwrap(),
"application/json-patch+json"
);
// Respond with the object now carrying the finalizer
let patched: ConfigMap = serde_json::from_value(serde_json::json!({
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": { "name": "test", "namespace": "default", "finalizers": [FINALIZER] },
}))
.unwrap();
send.send_response(Response::new(Body::from(serde_json::to_vec(&patched).unwrap())));
});

let action = finalizer(&api, FINALIZER, Arc::new(cm), |_event| async move {
panic!("reconcile must not run before the finalizer is added");
#[allow(unreachable_code)]
Ok::<_, Infallible>(Action::await_change())
})
.await
.expect("adding the finalizer should succeed");

assert_eq!(action, Action::await_change());
server.await.unwrap();
}

// With the finalizer already present and the object not being deleted, the finalizer runs
// `Apply` and passes its action straight through, without touching the API. The mock handle
// is dropped, so any request would error and fail the test.
#[tokio::test]
async fn apply_runs_reconcile_and_passes_action_through() {
let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
drop(handle); // any API call now errors
Comment on lines +376 to +382

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests are great 👍

let api: Api<ConfigMap> = Api::default_namespaced(Client::new(mock_service, "default"));

let action = finalizer(&api, FINALIZER, Arc::new(applied_cm()), |event| async move {
assert!(matches!(event, Event::Apply(_)), "expected an Apply event");
Ok::<_, Infallible>(Action::requeue(Duration::from_secs(60)))
})
.await
.expect("finalizer should not have called the API");

assert_eq!(action, Action::requeue(Duration::from_secs(60)));
}

// A `Cleanup` that returns `Action::requeue` is "still in progress": the finalizer must be
// kept and the API must not be touched. The mock handle is dropped, so any request the
// finalizer makes would error and fail the test.
#[tokio::test]
async fn cleanup_requeue_keeps_finalizer_without_patching() {
let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
drop(handle); // any API call now errors
let api: Api<ConfigMap> = Api::default_namespaced(Client::new(mock_service, "default"));

let action = finalizer(&api, FINALIZER, Arc::new(deleting_cm()), |event| async move {
assert!(matches!(event, Event::Cleanup(_)), "expected a Cleanup event");
Ok::<_, Infallible>(Action::requeue(Duration::from_secs(300)))
})
.await
.expect("finalizer should not have called the API");

assert_eq!(action, Action::requeue(Duration::from_secs(300)));
}

// A `Cleanup` that returns `Action::await_change` is complete: the finalizer issues a JSON
// patch removing the finalizer entry, then returns the action.
#[tokio::test]
async fn cleanup_await_change_removes_finalizer() {
let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
let api: Api<ConfigMap> = Api::default_namespaced(Client::new(mock_service, "default"));

let server = tokio::spawn(async move {
let mut handle = pin!(handle);
let (request, send) = handle.next_request().await.expect("finalizer should patch");
assert_eq!(request.method(), http::Method::PATCH);
assert_eq!(
request.uri().to_string(),
"/api/v1/namespaces/default/configmaps/test?"
);
assert_eq!(
request.headers().get(http::header::CONTENT_TYPE).unwrap(),
"application/json-patch+json"
);
// Respond with the (now finalizer-free) object
let patched: ConfigMap = serde_json::from_value(serde_json::json!({
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": { "name": "test", "namespace": "default" },
}))
.unwrap();
send.send_response(Response::new(Body::from(serde_json::to_vec(&patched).unwrap())));
});

let action = finalizer(&api, FINALIZER, Arc::new(deleting_cm()), |event| async move {
assert!(matches!(event, Event::Cleanup(_)), "expected a Cleanup event");
Ok::<_, Infallible>(Action::await_change())
})
.await
.expect("finalizer removal should succeed");

assert_eq!(action, Action::await_change());
server.await.unwrap();
}

// Adding our finalizer to an object that already has others must append (not overwrite),
// guarded by a `test` op on the existing list so a concurrent change forces a retry.
#[tokio::test]
async fn finalizer_is_appended_to_existing() {
let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
let api: Api<ConfigMap> = Api::default_namespaced(Client::new(mock_service, "default"));

let cm: ConfigMap = serde_json::from_value(serde_json::json!({
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": { "name": "test", "namespace": "default", "finalizers": ["other/finalizer"] },
}))
.unwrap();

let server = tokio::spawn(async move {
let mut handle = pin!(handle);
let (request, send) = handle
.next_request()
.await
.expect("finalizer should be patched in");
assert_eq!(request.method(), http::Method::PATCH);
assert_eq!(
json_patch_body(request).await,
serde_json::json!([
{ "op": "test", "path": "/metadata/finalizers", "value": ["other/finalizer"] },
{ "op": "add", "path": "/metadata/finalizers/-", "value": FINALIZER },
])
);
let patched: ConfigMap = serde_json::from_value(serde_json::json!({
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": { "name": "test", "namespace": "default", "finalizers": ["other/finalizer", FINALIZER] },
}))
.unwrap();
send.send_response(Response::new(Body::from(serde_json::to_vec(&patched).unwrap())));
});

let action = finalizer(&api, FINALIZER, Arc::new(cm), |_event| async move {
panic!("reconcile must not run before the finalizer is added");
#[allow(unreachable_code)]
Ok::<_, Infallible>(Action::await_change())
})
.await
.expect("appending the finalizer should succeed");

assert_eq!(action, Action::await_change());
server.await.unwrap();
}

// When cleanup completes and our finalizer sits among others, removal must target its exact
// index, guarded by a `test` op so we never remove someone else's finalizer.
#[tokio::test]
async fn cleanup_removes_finalizer_at_its_index() {
let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
let api: Api<ConfigMap> = Api::default_namespaced(Client::new(mock_service, "default"));

// Our finalizer is at index 1.
let cm: ConfigMap = serde_json::from_value(serde_json::json!({
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"name": "test",
"namespace": "default",
"deletionTimestamp": "2024-01-01T00:00:00Z",
"finalizers": ["other/finalizer", FINALIZER],
},
}))
.unwrap();

let server = tokio::spawn(async move {
let mut handle = pin!(handle);
let (request, send) = handle.next_request().await.expect("finalizer should be removed");
assert_eq!(request.method(), http::Method::PATCH);
assert_eq!(
json_patch_body(request).await,
serde_json::json!([
{ "op": "test", "path": "/metadata/finalizers/1", "value": FINALIZER },
{ "op": "remove", "path": "/metadata/finalizers/1" },
])
);
let patched: ConfigMap = serde_json::from_value(serde_json::json!({
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": { "name": "test", "namespace": "default", "finalizers": ["other/finalizer"] },
}))
.unwrap();
send.send_response(Response::new(Body::from(serde_json::to_vec(&patched).unwrap())));
});

let action = finalizer(&api, FINALIZER, Arc::new(cm), |event| async move {
assert!(matches!(event, Event::Cleanup(_)), "expected a Cleanup event");
Ok::<_, Infallible>(Action::await_change())
})
.await
.expect("finalizer removal should succeed");

assert_eq!(action, Action::await_change());
server.await.unwrap();
}
}
Loading