|
1 | 1 | use std::collections::BTreeMap; |
2 | 2 | use std::error::Error as _; |
| 3 | +use std::str::FromStr; |
3 | 4 | use std::sync::{Arc, Mutex}; |
4 | 5 | use std::time::Duration; |
5 | 6 |
|
6 | 7 | use futures::future::FutureExt; |
7 | 8 | use futures::stream::StreamExt; |
8 | | -use kube::api::Api; |
| 9 | +use json_patch::jsonptr::PointerBuf; |
| 10 | +use json_patch::{AddOperation, PatchOperation, RemoveOperation, TestOperation}; |
| 11 | +use kube::api::{Api, Patch, PatchParams}; |
9 | 12 | use kube::core::{ClusterResourceScope, NamespaceResourceScope}; |
10 | 13 | use kube::{Client, Resource, ResourceExt}; |
11 | 14 | use kube_runtime::controller::Action; |
12 | | -use kube_runtime::finalizer::{Event, finalizer}; |
13 | 15 | use kube_runtime::watcher; |
14 | 16 | use rand::{Rng, rng}; |
15 | 17 | use tracing::{Level, event}; |
@@ -326,37 +328,140 @@ pub trait Context { |
326 | 328 | let kind = Self::Resource::kind(&dynamic_type).into_owned(); |
327 | 329 | let mut ran = false; |
328 | 330 | let res = if let Some(finalizer_name) = Self::FINALIZER_NAME { |
329 | | - finalizer(&api, finalizer_name, Arc::clone(&resource), |event| async { |
330 | | - ran = true; |
331 | | - event!( |
332 | | - Level::INFO, |
333 | | - resource_name = %resource.name_unchecked().as_str(), |
334 | | - controller = Self::FINALIZER_NAME, |
335 | | - "Reconciling {} ({}).", |
336 | | - kind, |
337 | | - match event { |
338 | | - Event::Apply(_) => "apply", |
339 | | - Event::Cleanup(_) => "cleanup", |
340 | | - } |
341 | | - ); |
342 | | - let action = match event { |
343 | | - Event::Apply(resource) => { |
344 | | - let action = self.apply(client, &resource).await?; |
345 | | - if let Some(action) = action { |
346 | | - action |
347 | | - } else { |
348 | | - self.success_action(&resource) |
| 331 | + // We manage the finalizer ourselves rather than going through |
| 332 | + // `kube_runtime::finalizer`, because that helper unconditionally |
| 333 | + // removes the finalizer whenever the cleanup closure returns `Ok`. |
| 334 | + // We only want to remove it when `cleanup` returns `Ok(None)` |
| 335 | + // (cleanup complete); `Ok(Some(action))` means the caller wants to |
| 336 | + // requeue and run cleanup again, so the finalizer must stay. |
| 337 | + // |
| 338 | + // The patches below mirror `kube_runtime::finalizer`: each is |
| 339 | + // guarded by an RFC 6902 `Test` op so we fail (and a fresh |
| 340 | + // reconcile fires) instead of clobbering another controller's |
| 341 | + // finalizer if the list shifted underneath us. |
| 342 | + let name = resource.name_unchecked(); |
| 343 | + let finalizer_i = resource |
| 344 | + .finalizers() |
| 345 | + .iter() |
| 346 | + .position(|f| f == finalizer_name); |
| 347 | + let is_deleting = resource.meta().deletion_timestamp.is_some(); |
| 348 | + match (finalizer_i, is_deleting) { |
| 349 | + (Some(_), false) => { |
| 350 | + // Finalizer present and not deleting: run the apply event. |
| 351 | + ran = true; |
| 352 | + event!( |
| 353 | + Level::INFO, |
| 354 | + resource_name = %name.as_str(), |
| 355 | + controller = Self::FINALIZER_NAME, |
| 356 | + "Reconciling {} (apply).", |
| 357 | + kind, |
| 358 | + ); |
| 359 | + let action = self.apply(client, &resource).await.map_err(|e| { |
| 360 | + Error::FinalizerError(kube_runtime::finalizer::Error::ApplyFailed(e)) |
| 361 | + })?; |
| 362 | + Ok(action.unwrap_or_else(|| self.success_action(&resource))) |
| 363 | + } |
| 364 | + (None, false) => { |
| 365 | + // Not deleting and finalizer missing: add it. No point |
| 366 | + // running apply here, since the patch triggers a fresh |
| 367 | + // reconcile that lands in the arm above. |
| 368 | + let patch = if resource.meta().finalizers.is_none() { |
| 369 | + // No `finalizers` field at all: test that it's absent, |
| 370 | + // then create it with our finalizer. (An empty array |
| 371 | + // `Some([])` takes the append path below, since the |
| 372 | + // field exists.) |
| 373 | + json_patch::Patch(vec![ |
| 374 | + PatchOperation::Test(TestOperation { |
| 375 | + path: PointerBuf::from_str("/metadata/finalizers") |
| 376 | + .expect("constructed pointer is valid"), |
| 377 | + // `serde_json::Value`'s default is `Null`; we |
| 378 | + // don't depend on `serde_json` directly so name |
| 379 | + // it via the field type rather than the path. |
| 380 | + value: Default::default(), |
| 381 | + }), |
| 382 | + PatchOperation::Add(AddOperation { |
| 383 | + path: PointerBuf::from_str("/metadata/finalizers") |
| 384 | + .expect("constructed pointer is valid"), |
| 385 | + value: vec![finalizer_name].into(), |
| 386 | + }), |
| 387 | + ]) |
| 388 | + } else { |
| 389 | + // Kubernetes doesn't deduplicate finalizers, so test the |
| 390 | + // current list and fail/retry if anyone else appended in |
| 391 | + // the meantime. |
| 392 | + json_patch::Patch(vec![ |
| 393 | + PatchOperation::Test(TestOperation { |
| 394 | + path: PointerBuf::from_str("/metadata/finalizers") |
| 395 | + .expect("constructed pointer is valid"), |
| 396 | + value: resource.finalizers().into(), |
| 397 | + }), |
| 398 | + PatchOperation::Add(AddOperation { |
| 399 | + path: PointerBuf::from_str("/metadata/finalizers/-") |
| 400 | + .expect("constructed pointer is valid"), |
| 401 | + value: finalizer_name.into(), |
| 402 | + }), |
| 403 | + ]) |
| 404 | + }; |
| 405 | + api.patch::<Self::Resource>( |
| 406 | + &name, |
| 407 | + &PatchParams::default(), |
| 408 | + &Patch::Json(patch), |
| 409 | + ) |
| 410 | + .await |
| 411 | + .map_err(|e| { |
| 412 | + Error::FinalizerError(kube_runtime::finalizer::Error::AddFinalizer(e)) |
| 413 | + })?; |
| 414 | + Ok(Action::await_change()) |
| 415 | + } |
| 416 | + (Some(finalizer_i), true) => { |
| 417 | + // Deleting with our finalizer present: run cleanup. Only |
| 418 | + // remove the finalizer once cleanup reports completion |
| 419 | + // (`Ok(None)`); a returned action means "requeue and run |
| 420 | + // cleanup again", so the finalizer stays in place. |
| 421 | + ran = true; |
| 422 | + event!( |
| 423 | + Level::INFO, |
| 424 | + resource_name = %name.as_str(), |
| 425 | + controller = Self::FINALIZER_NAME, |
| 426 | + "Reconciling {} (cleanup).", |
| 427 | + kind, |
| 428 | + ); |
| 429 | + match self.cleanup(client, &resource).await { |
| 430 | + Err(e) => Err(Error::FinalizerError( |
| 431 | + kube_runtime::finalizer::Error::CleanupFailed(e), |
| 432 | + )), |
| 433 | + Ok(Some(action)) => Ok(action), |
| 434 | + Ok(None) => { |
| 435 | + let finalizer_path = format!("/metadata/finalizers/{finalizer_i}"); |
| 436 | + let pointer = PointerBuf::from_str(&finalizer_path) |
| 437 | + .expect("constructed pointer is valid"); |
| 438 | + let patch = json_patch::Patch(vec![ |
| 439 | + PatchOperation::Test(TestOperation { |
| 440 | + path: pointer.clone(), |
| 441 | + value: finalizer_name.into(), |
| 442 | + }), |
| 443 | + PatchOperation::Remove(RemoveOperation { path: pointer }), |
| 444 | + ]); |
| 445 | + api.patch::<Self::Resource>( |
| 446 | + &name, |
| 447 | + &PatchParams::default(), |
| 448 | + &Patch::Json(patch), |
| 449 | + ) |
| 450 | + .await |
| 451 | + .map_err(|e| { |
| 452 | + Error::FinalizerError( |
| 453 | + kube_runtime::finalizer::Error::RemoveFinalizer(e), |
| 454 | + ) |
| 455 | + })?; |
| 456 | + Ok(Action::await_change()) |
349 | 457 | } |
350 | 458 | } |
351 | | - Event::Cleanup(resource) => self |
352 | | - .cleanup(client, &resource) |
353 | | - .await? |
354 | | - .unwrap_or_else(Action::await_change), |
355 | | - }; |
356 | | - Ok(action) |
357 | | - }) |
358 | | - .await |
359 | | - .map_err(Error::FinalizerError) |
| 459 | + } |
| 460 | + (None, true) => { |
| 461 | + // Deleting and our finalizer already gone: nothing to do. |
| 462 | + Ok(Action::await_change()) |
| 463 | + } |
| 464 | + } |
360 | 465 | } else if resource.meta().deletion_timestamp.is_none() { |
361 | 466 | ran = true; |
362 | 467 | event!( |
|
0 commit comments