Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ jobs:
runs-on: ubuntu-24.04
steps:
- uses: actions/checkout@v2
- uses: helm/kind-action@v1
with:
cluster_name: k8s-controller-test
wait: 180s
- uses: actions-rs/toolchain@v1
with:
toolchain: 1.89.0
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
/target
/target-ra
/Cargo.lock
8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "k8s-controller"
version = "0.10.0"
version = "0.11.0"
edition = "2024"
rust-version = "1.89.0"

Expand All @@ -13,7 +13,8 @@ include = ["src/**/*", "LICENSE", "README.md", "CHANGELOG.md"]
[dependencies]
async-trait = "0.1"
futures = "0.3"
kube = { version = "3.0.0", default-features = false, features = ["client"] }
json-patch = "4"
kube = { version = "3.0.0", default-features = false, features = ["client", "jsonpatch"] }
kube-runtime = "3.0.0"
rand = "0.9.2"
serde = "1"
Expand All @@ -22,7 +23,8 @@ tracing = "0.1"

[dev-dependencies]
k8s-openapi = { version = "0.27.0", default-features = false, features = ["v1_32"] }
tokio = "1"
kube = { version = "3.0.0", default-features = false, features = ["client", "jsonpatch", "rustls-tls", "aws-lc-rs"] }
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time", "process"] }

[package.metadata.docs.rs]
features = ["k8s-openapi/latest"]
11 changes: 4 additions & 7 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,20 @@ ignore = [
[bans]
multiple-versions = "deny"
skip = [
{ name = "thiserror", version = "1.0.69" },
{ name = "thiserror-impl", version = "1.0.69" },
# kube-runtime pulls hashbrown 0.16, while serde_yaml (via indexmap) pulls 0.17.
{ name = "hashbrown", version = "0.16.1" },
]

# Use `tracing` instead.
[[bans.deny]]
name = "env_logger"

# We prefer the system's native TLS or OpenSSL to Rustls, since they are more
# mature and more widely used.
[[bans.deny]]
name = "rustls"

[licenses]
version = 2
allow = [
"Apache-2.0",
"BSD-3-Clause",
"ISC",
"MIT",
"Unicode-3.0",
"Zlib",
Expand Down
167 changes: 136 additions & 31 deletions src/controller.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use std::collections::BTreeMap;
use std::error::Error as _;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use futures::future::FutureExt;
use futures::stream::StreamExt;
use kube::api::Api;
use json_patch::jsonptr::PointerBuf;
use json_patch::{AddOperation, PatchOperation, RemoveOperation, TestOperation};
use kube::api::{Api, Patch, PatchParams};
use kube::core::{ClusterResourceScope, NamespaceResourceScope};
use kube::{Client, Resource, ResourceExt};
use kube_runtime::controller::Action;
use kube_runtime::finalizer::{Event, finalizer};
use kube_runtime::watcher;
use rand::{Rng, rng};
use tracing::{Level, event};
Expand Down Expand Up @@ -326,37 +328,140 @@ pub trait Context {
let kind = Self::Resource::kind(&dynamic_type).into_owned();
let mut ran = false;
let res = if let Some(finalizer_name) = Self::FINALIZER_NAME {
finalizer(&api, finalizer_name, Arc::clone(&resource), |event| async {
ran = true;
event!(
Level::INFO,
resource_name = %resource.name_unchecked().as_str(),
controller = Self::FINALIZER_NAME,
"Reconciling {} ({}).",
kind,
match event {
Event::Apply(_) => "apply",
Event::Cleanup(_) => "cleanup",
}
);
let action = match event {
Event::Apply(resource) => {
let action = self.apply(client, &resource).await?;
if let Some(action) = action {
action
} else {
self.success_action(&resource)
// We manage the finalizer ourselves rather than going through
// `kube_runtime::finalizer`, because that helper unconditionally
// removes the finalizer whenever the cleanup closure returns `Ok`.
// We only want to remove it when `cleanup` returns `Ok(None)`
// (cleanup complete); `Ok(Some(action))` means the caller wants to
// requeue and run cleanup again, so the finalizer must stay.
//
// The patches below mirror `kube_runtime::finalizer`: each is
// guarded by an RFC 6902 `Test` op so we fail (and a fresh
// reconcile fires) instead of clobbering another controller's
// finalizer if the list shifted underneath us.
let name = resource.name_unchecked();
let finalizer_i = resource
.finalizers()
.iter()
.position(|f| f == finalizer_name);
let is_deleting = resource.meta().deletion_timestamp.is_some();
match (finalizer_i, is_deleting) {
(Some(_), false) => {
// Finalizer present and not deleting: run the apply event.
ran = true;
event!(
Level::INFO,
resource_name = %name.as_str(),
controller = Self::FINALIZER_NAME,
"Reconciling {} (apply).",
kind,
);
let action = self.apply(client, &resource).await.map_err(|e| {
Error::FinalizerError(kube_runtime::finalizer::Error::ApplyFailed(e))
})?;
Ok(action.unwrap_or_else(|| self.success_action(&resource)))
}
(None, false) => {
// Not deleting and finalizer missing: add it. No point
// running apply here, since the patch triggers a fresh
// reconcile that lands in the arm above.
let patch = if resource.meta().finalizers.is_none() {
// No `finalizers` field at all: test that it's absent,
// then create it with our finalizer. (An empty array
// `Some([])` takes the append path below, since the
// field exists.)
json_patch::Patch(vec![
PatchOperation::Test(TestOperation {
path: PointerBuf::from_str("/metadata/finalizers")
.expect("constructed pointer is valid"),
// `serde_json::Value`'s default is `Null`; we
// don't depend on `serde_json` directly so name
// it via the field type rather than the path.
value: Default::default(),
}),
PatchOperation::Add(AddOperation {
path: PointerBuf::from_str("/metadata/finalizers")
.expect("constructed pointer is valid"),
value: vec![finalizer_name].into(),
}),
])
} else {
// Kubernetes doesn't deduplicate finalizers, so test the
// current list and fail/retry if anyone else appended in
// the meantime.
json_patch::Patch(vec![
PatchOperation::Test(TestOperation {
path: PointerBuf::from_str("/metadata/finalizers")
.expect("constructed pointer is valid"),
value: resource.finalizers().into(),
}),
PatchOperation::Add(AddOperation {
path: PointerBuf::from_str("/metadata/finalizers/-")
.expect("constructed pointer is valid"),
value: finalizer_name.into(),
}),
])
};
api.patch::<Self::Resource>(
&name,
&PatchParams::default(),
&Patch::Json(patch),
)
.await
.map_err(|e| {
Error::FinalizerError(kube_runtime::finalizer::Error::AddFinalizer(e))
})?;
Ok(Action::await_change())
}
(Some(finalizer_i), true) => {
// Deleting with our finalizer present: run cleanup. Only
// remove the finalizer once cleanup reports completion
// (`Ok(None)`); a returned action means "requeue and run
// cleanup again", so the finalizer stays in place.
ran = true;
event!(
Level::INFO,
resource_name = %name.as_str(),
controller = Self::FINALIZER_NAME,
"Reconciling {} (cleanup).",
kind,
);
match self.cleanup(client, &resource).await {
Err(e) => Err(Error::FinalizerError(
kube_runtime::finalizer::Error::CleanupFailed(e),
)),
Ok(Some(action)) => Ok(action),
Ok(None) => {
let finalizer_path = format!("/metadata/finalizers/{finalizer_i}");
let pointer = PointerBuf::from_str(&finalizer_path)
.expect("constructed pointer is valid");
let patch = json_patch::Patch(vec![
PatchOperation::Test(TestOperation {
path: pointer.clone(),
value: finalizer_name.into(),
}),
PatchOperation::Remove(RemoveOperation { path: pointer }),
]);
api.patch::<Self::Resource>(
&name,
&PatchParams::default(),
&Patch::Json(patch),
)
.await
.map_err(|e| {
Error::FinalizerError(
kube_runtime::finalizer::Error::RemoveFinalizer(e),
)
})?;
Ok(Action::await_change())
}
}
Event::Cleanup(resource) => self
.cleanup(client, &resource)
.await?
.unwrap_or_else(Action::await_change),
};
Ok(action)
})
.await
.map_err(Error::FinalizerError)
}
(None, true) => {
// Deleting and our finalizer already gone: nothing to do.
Ok(Action::await_change())
}
}
} else if resource.meta().deletion_timestamp.is_none() {
ran = true;
event!(
Expand Down
Loading
Loading