Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/services/backends/kubernetes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ mod tests;
pub mod kubeconfig_loader;
pub mod kubernetes_resource_manager;
pub mod kubernetes_resource_watcher;
pub mod logging_update_handler;
pub mod repositories;
pub mod resource_update_handler;
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ use crate::services::backends::kubernetes::kubernetes_resource_manager::object_o
use crate::services::backends::kubernetes::kubernetes_resource_manager::status::Status;
use crate::services::backends::kubernetes::kubernetes_resource_manager::status::Status::{Conflict, NotOwned};
use crate::services::backends::kubernetes::kubernetes_resource_manager::status::owner_conflict_details::OwnerConflictDetails;
use crate::services::backends::kubernetes::kubernetes_resource_watcher::{
KubernetesResourceWatcher, ResourceUpdateHandler,
};
use crate::services::backends::kubernetes::kubernetes_resource_watcher::KubernetesResourceWatcher;
use crate::services::backends::kubernetes::resource_update_handler::ResourceUpdateHandler;
use anyhow::{Error, anyhow};
use async_trait::async_trait;
use futures::StreamExt;
Expand Down Expand Up @@ -217,7 +216,7 @@ where
.for_each(move |r| {
let update_handler = update_handler.clone();
async move {
update_handler.handle_update(r).await;
update_handler.handle_update(&r).await;
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod tests;
use super::{KubernetesResourceManager, KubernetesResourceManagerConfig, ResourceUpdateHandler, UpdateLabels};
use crate::services::backends::kubernetes::kubernetes_resource_manager::status::Status;
use crate::services::backends::kubernetes::kubernetes_resource_watcher::KubernetesResourceWatcher;
use crate::services::backends::kubernetes::logging_update_handler::LoggingUpdateHandler;
use crate::services::backends::kubernetes::resource_update_handler::logging_update_handler::LoggingUpdateHandler;
use anyhow::Error;
use kube::runtime::reflector::ObjectRef;
use serde::Serialize;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,14 @@
use crate::services::backends::kubernetes::kubernetes_resource_manager::KubernetesResourceManagerConfig;
use crate::services::backends::kubernetes::resource_update_handler::ResourceUpdateHandler;
use async_trait::async_trait;
use k8s_openapi::NamespaceResourceScope;
use kube::Resource;
use kube::runtime::watcher;
use serde::Serialize;
use serde::de::DeserializeOwned;
use std::fmt::Debug;
use std::hash::Hash;
use std::sync::Arc;

#[async_trait]
pub trait ResourceUpdateHandler<S>: Send + Sync
where
S: Resource + Send + Sync,
{
async fn handle_update(&self, result: Result<S, watcher::Error>) -> ();
}

#[async_trait]
pub trait KubernetesResourceWatcher<H, R>: Sized
where
Expand Down
38 changes: 29 additions & 9 deletions src/services/backends/kubernetes/repositories.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ use crate::services::backends::kubernetes::kubernetes_resource_manager::status::
use crate::services::backends::kubernetes::kubernetes_resource_manager::{
KubernetesResourceManagerConfig, UpdateLabels,
};
use crate::services::backends::kubernetes::logging_update_handler::LoggingUpdateHandler;
use crate::services::backends::kubernetes::repositories::try_into_object_ref::TryIntoObjectRef;
use crate::services::base::upsert_repository::{CanDelete, ReadOnlyRepository, UpsertRepository};
use crate::services::backends::kubernetes::resource_update_handler::ResourceUpdateHandler;
use crate::services::backends::kubernetes::resource_update_handler::logging_update_handler::LoggingUpdateHandler;
use crate::services::base::upsert_repository::{
CanDelete, ReadOnlyRepository, UpsertRepository, UpsertRepositoryWithDelete,
};
use async_trait::async_trait;
use k8s_openapi::NamespaceResourceScope;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
Expand All @@ -16,6 +19,7 @@ use serde::Serialize;
use serde::de::DeserializeOwned;
use std::fmt::Debug;
use std::hash::Hash;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
Expand Down Expand Up @@ -57,26 +61,29 @@ where
Self: Sized;
}

pub struct KubernetesRepository<Resource>
pub struct KubernetesRepository<Resource, H = LoggingUpdateHandler>
where
Resource: kube::Resource + SoftDeleteResource + Send + Sync + 'static,
Resource::DynamicType: Hash + Eq,
{
pub resource_manager: SpinLockKubernetesResourceManager<Resource>,
pub operation_timeout: Duration,
_phantom: PhantomData<(Resource, H)>,
}

impl<R> KubernetesRepository<R>
impl<R, H> KubernetesRepository<R, H>
where
R: kube::Resource + SoftDeleteResource + UpdateLabels + Send + Sync + 'static,
R::DynamicType: Hash + Eq + Clone + Default,
H: ResourceUpdateHandler<R> + Send + Sync + 'static,
{
pub async fn start(config: KubernetesResourceManagerConfig) -> anyhow::Result<Self> {
pub async fn start(config: KubernetesResourceManagerConfig, update_handler: Arc<H>) -> anyhow::Result<Self> {
let operation_timeout = config.operation_timeout;
let resource_manager = SpinLockKubernetesResourceManager::start(config, Arc::new(LoggingUpdateHandler)).await?;
let resource_manager = SpinLockKubernetesResourceManager::start(config, update_handler).await?;
Ok(KubernetesRepository {
resource_manager,
operation_timeout,
_phantom: PhantomData,
})
}

Expand All @@ -98,12 +105,13 @@ where
}

#[async_trait]
impl<Key, Value, Resource> ReadOnlyRepository<Key, Value> for KubernetesRepository<Resource>
impl<Key, Value, Resource, H> ReadOnlyRepository<Key, Value> for KubernetesRepository<Resource, H>
where
Resource: SoftDeleteResource + UpdateLabels,
Resource::DynamicType: Hash + Eq + Clone + Default,
Key: TryIntoObjectRef<Resource, Error = anyhow::Error> + Send + Sync + 'static,
Value: TryFromResource<Resource, Error = Status> + Send + Sync + 'static,
H: ResourceUpdateHandler<Resource> + Send + Sync + 'static,
{
type ReadError = Status;

Expand All @@ -124,12 +132,13 @@ where
}

#[async_trait]
impl<Key, Value, Resource> CanDelete<Key, Value> for KubernetesRepository<Resource>
impl<Key, Value, Resource, H> CanDelete<Key, Value> for KubernetesRepository<Resource, H>
where
Resource: SoftDeleteResource + UpdateLabels,
Resource::DynamicType: Hash + Eq + Clone + Default,
Key: TryIntoObjectRef<Resource, Error = anyhow::Error> + Send + Sync + Clone + 'static,
Value: Send + Sync + 'static,
H: ResourceUpdateHandler<Resource> + Send + Sync + 'static,
{
type DeleteError = Status;

Expand Down Expand Up @@ -164,12 +173,13 @@ where
}

#[async_trait]
impl<Key, Value, Resource> UpsertRepository<Key, Value> for KubernetesRepository<Resource>
impl<Key, Value, Resource, H> UpsertRepository<Key, Value> for KubernetesRepository<Resource, H>
where
Resource: SoftDeleteResource + UpdateLabels,
Resource::DynamicType: Hash + Eq + Clone + Default,
Key: TryIntoObjectRef<Resource, Error = anyhow::Error> + Send + Sync + Clone + 'static,
Value: ToResource<Resource> + TryFromResource<Resource, Error = Status> + Send + Sync + 'static,
H: ResourceUpdateHandler<Resource> + Send + Sync + 'static,
{
type Error = Status;

Expand Down Expand Up @@ -221,3 +231,13 @@ where
Ok(self.resource_manager.get(&object_ref).is_ok())
}
}

impl<Key, Value, Resource, H> UpsertRepositoryWithDelete<Key, Value> for KubernetesRepository<Resource, H>
where
Resource: SoftDeleteResource + UpdateLabels,
Resource::DynamicType: Hash + Eq + Clone + Default,
Key: TryIntoObjectRef<Resource, Error = anyhow::Error> + Send + Sync + Clone + 'static,
Value: ToResource<Resource> + TryFromResource<Resource, Error = Status> + Send + Sync + 'static,
H: ResourceUpdateHandler<Resource> + Send + Sync + 'static,
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,5 @@ impl ToAuditRecord for SchemaFragment {
}
}

impl UpsertRepositoryWithDelete<String, SchemaFragment> for KubernetesRepository<SchemaDocument> {}

pub type SchemaRepository =
dyn UpsertRepositoryWithDelete<String, SchemaFragment, DeleteError = Status, Error = Status, ReadError = Status>;
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::services::backends::kubernetes::kubernetes_resource_manager::status::
use crate::services::backends::kubernetes::repositories::TryIntoObjectRef;
use crate::services::backends::kubernetes::repositories::schema_repository::test_reduced_schema::reduced_schema;
use crate::services::backends::kubernetes::repositories::schema_repository::test_schema::schema;
use crate::services::backends::kubernetes::resource_update_handler::logging_update_handler::LoggingUpdateHandler;
use crate::testing::api_extensions::{WaitForDelete, WaitForResource};
use crate::testing::spin_lock_kubernetes_resource_manager_context::SpinLockKubernetesResourceManagerTestContext;
use assert_matches::assert_matches;
Expand All @@ -13,6 +14,7 @@ use kube::api::PostParams;
use kube::runtime::reflector::ObjectRef;
use maplit::btreemap;
use std::collections::BTreeMap;
use std::marker::PhantomData;
use std::time::Duration;
use test_context::{AsyncTestContext, test_context};

Expand All @@ -29,9 +31,10 @@ impl AsyncTestContext for KubernetesSchemaRepositoryTest {
async fn setup() -> KubernetesSchemaRepositoryTest {
let parent = SpinLockKubernetesResourceManagerTestContext::setup().await;
let label = parent.config.owner_mark.get_owner_name().clone();
let repository = Arc::new(KubernetesRepository {
let repository = Arc::new(KubernetesRepository::<SchemaDocument, LoggingUpdateHandler> {
resource_manager: parent.manager,
operation_timeout: parent.config.operation_timeout,
_phantom: PhantomData,
});
Self {
repository,
Expand Down
14 changes: 14 additions & 0 deletions src/services/backends/kubernetes/resource_update_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
pub mod composed_update_handler;
pub mod logging_update_handler;

use async_trait::async_trait;
use kube::Resource;
use kube::runtime::watcher;

#[async_trait]
pub trait ResourceUpdateHandler<S>: Send + Sync
where
S: Resource + Send + Sync,
{
async fn handle_update(&self, result: &Result<S, watcher::Error>) -> ();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use crate::services::backends::kubernetes::resource_update_handler::ResourceUpdateHandler;
use async_trait::async_trait;
use kube::Resource;
use std::fmt::Debug;

pub struct ComposedUpdateHandler<T> {
handlers: Vec<Box<dyn ResourceUpdateHandler<T>>>,
}

impl<T> ComposedUpdateHandler<T> {
pub fn new() -> Self {
Self { handlers: Vec::new() }
}

pub fn add_handler(mut self, handler: Box<dyn ResourceUpdateHandler<T>>) -> Self {
self.handlers.push(handler);
self
}
}

#[async_trait]
impl<T> ResourceUpdateHandler<T> for ComposedUpdateHandler<T>
where
T: Resource + Debug + Send + Sync + 'static,
{
async fn handle_update(&self, result: &Result<T, kube::runtime::watcher::Error>) -> () {
for handler in &self.handlers {
handler.handle_update(result).await;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
#[cfg(not(test))]
use log::{debug, warn};

#[cfg(test)]
use std::{println as warn, println as debug};

use crate::services::backends::kubernetes::kubernetes_resource_watcher::ResourceUpdateHandler;
use crate::services::backends::kubernetes::resource_update_handler::ResourceUpdateHandler;
use async_trait::async_trait;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use kube::Resource;
use kube::runtime::watcher;
use log::{debug, warn};

use std::fmt::Debug;

Expand All @@ -19,8 +14,8 @@ impl<T> ResourceUpdateHandler<T> for LoggingUpdateHandler
where
T: Resource + Debug + Send + Sync + 'static,
{
async fn handle_update(&self, event: Result<T, watcher::Error>) -> () {
if let Err(e) = event {
async fn handle_update(&self, event: &Result<T, watcher::Error>) -> () {
if let Err(e) = event.as_ref() {
warn!("Error processing event: {}", e);
return;
}
Expand Down
33 changes: 29 additions & 4 deletions src/services/backends/memory.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::services::base::upsert_repository::{
CanDelete, ReadOnlyRepository, UpsertRepository, UpsertRepositoryWithDelete,
CanDelete, ReadOnlyRepository, ReadOnlyRepositoryWithFactory, UpsertRepository, UpsertRepositoryWithDelete,
ValueFactory,
};
use anyhow::bail;
use async_trait::async_trait;
Expand All @@ -9,7 +10,7 @@ use std::hash::Hash;
use tokio::sync::RwLock;

#[async_trait]
impl<Entity, Key> ReadOnlyRepository<Key, Entity> for RwLock<HashMap<Key, Entity>>
impl<Key, Entity> ReadOnlyRepository<Key, Entity> for RwLock<HashMap<Key, Entity>>
where
Entity: Clone + Send + Sync,
Key: Debug + Eq + Hash + Send + Sync,
Expand All @@ -26,7 +27,7 @@ where
}

#[async_trait]
impl<Entity, Key> UpsertRepository<Key, Entity> for RwLock<HashMap<Key, Entity>>
impl<Key, Entity> UpsertRepository<Key, Entity> for RwLock<HashMap<Key, Entity>>
where
Entity: Send + Sync + Clone,
Key: Send + Sync + Eq + Hash + Debug,
Expand Down Expand Up @@ -60,9 +61,33 @@ where
}
}

impl<Entity, Key> UpsertRepositoryWithDelete<Key, Entity> for RwLock<HashMap<Key, Entity>>
impl<Key, Entity> UpsertRepositoryWithDelete<Key, Entity> for RwLock<HashMap<Key, Entity>>
where
Entity: Send + Sync + Clone,
Key: Send + Sync + Eq + Hash + Debug,
{
}

#[async_trait]
impl<Key, Entity> ReadOnlyRepositoryWithFactory<Key, Entity> for RwLock<HashMap<Key, Entity>>
where
Entity: Send + Sync + Clone,
Key: Send + Sync + Eq + Hash + Debug + Clone,
{
type ReadError = anyhow::Error;

async fn get(
&self,
key: Key,
create_new: &dyn ValueFactory<Key, Entity, CreateError = Self::ReadError>,
) -> Result<Entity, Self::ReadError> {
let mut write_guard = self.write().await;
if let Some(entity) = (*write_guard).get(&key) {
Ok(entity.clone())
} else {
let new_entity = create_new.create(&key).await?;
(*write_guard).insert(key, new_entity.clone());
Ok(new_entity)
}
}
}
20 changes: 20 additions & 0 deletions src/services/base/upsert_repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,26 @@ pub trait ReadOnlyRepository<Key, Entity>: Send + Sync {
async fn get(&self, key: Key) -> Result<Entity, Self::ReadError>;
}

#[async_trait]
pub trait ValueFactory<Key, Entity>: Send + Sync {
type CreateError;

async fn create(&self, key: &Key) -> Result<Entity, Self::CreateError>;
}

#[async_trait]
/// Represents a repository for policies
pub trait ReadOnlyRepositoryWithFactory<Key, Entity>: Send + Sync {
Copy link

Copilot AI Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation comment mentions 'policies' but this is a generic trait that can work with any entity type. Update the comment to reflect its generic nature.

Copilot uses AI. Check for mistakes.
type ReadError;

/// Retrieves a policy by id
async fn get(
Copy link

Copilot AI Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation comment mentions 'policy by id' but this method works with generic Key and Entity types. Update the comment to be more generic.

Copilot uses AI. Check for mistakes.
&self,
key: Key,
create_new: &dyn ValueFactory<Key, Entity, CreateError = Self::ReadError>,
) -> Result<Entity, Self::ReadError>;
}

#[async_trait]
/// Represents a repository for policies
pub trait CanDelete<Key, Entity>: Send + Sync {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::services::backends::kubernetes::kubernetes_resource_manager::spin_loc
use crate::services::backends::kubernetes::kubernetes_resource_manager::{
KubernetesResourceManagerConfig, UpdateLabels,
};
use crate::services::backends::kubernetes::logging_update_handler::LoggingUpdateHandler;
use crate::services::backends::kubernetes::resource_update_handler::logging_update_handler::LoggingUpdateHandler;
use crate::testing::api_client_context::ApiClientContext;
use serde::Serialize;
use serde::de::DeserializeOwned;
Expand Down