Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/services/backends/kubernetes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ mod tests;
pub mod kubeconfig_loader;
pub mod kubernetes_resource_manager;
pub mod kubernetes_resource_watcher;
pub mod logging_update_handler;
pub mod repositories;
pub mod resource_update_handler;
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ use crate::services::backends::kubernetes::kubernetes_resource_manager::object_o
use crate::services::backends::kubernetes::kubernetes_resource_manager::status::Status;
use crate::services::backends::kubernetes::kubernetes_resource_manager::status::Status::{Conflict, NotOwned};
use crate::services::backends::kubernetes::kubernetes_resource_manager::status::owner_conflict_details::OwnerConflictDetails;
use crate::services::backends::kubernetes::kubernetes_resource_watcher::{
KubernetesResourceWatcher, ResourceUpdateHandler,
};
use crate::services::backends::kubernetes::kubernetes_resource_watcher::KubernetesResourceWatcher;
use crate::services::backends::kubernetes::resource_update_handler::ResourceUpdateHandler;
use anyhow::{Error, anyhow};
use async_trait::async_trait;
use futures::StreamExt;
Expand Down Expand Up @@ -217,7 +216,7 @@ where
.for_each(move |r| {
let update_handler = update_handler.clone();
async move {
update_handler.handle_update(r).await;
update_handler.handle_update(&r).await;
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod tests;
use super::{KubernetesResourceManager, KubernetesResourceManagerConfig, ResourceUpdateHandler, UpdateLabels};
use crate::services::backends::kubernetes::kubernetes_resource_manager::status::Status;
use crate::services::backends::kubernetes::kubernetes_resource_watcher::KubernetesResourceWatcher;
use crate::services::backends::kubernetes::logging_update_handler::LoggingUpdateHandler;
use crate::services::backends::kubernetes::resource_update_handler::logging_update_handler::LoggingUpdateHandler;
use anyhow::Error;
use kube::runtime::reflector::ObjectRef;
use serde::Serialize;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,14 @@
use crate::services::backends::kubernetes::kubernetes_resource_manager::KubernetesResourceManagerConfig;
use crate::services::backends::kubernetes::resource_update_handler::ResourceUpdateHandler;
use async_trait::async_trait;
use k8s_openapi::NamespaceResourceScope;
use kube::Resource;
use kube::runtime::watcher;
use serde::Serialize;
use serde::de::DeserializeOwned;
use std::fmt::Debug;
use std::hash::Hash;
use std::sync::Arc;

#[async_trait]
pub trait ResourceUpdateHandler<S>: Send + Sync
where
S: Resource + Send + Sync,
{
async fn handle_update(&self, result: Result<S, watcher::Error>) -> ();
}

#[async_trait]
pub trait KubernetesResourceWatcher<H, R>: Sized
where
Expand Down
38 changes: 29 additions & 9 deletions src/services/backends/kubernetes/repositories.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ use crate::services::backends::kubernetes::kubernetes_resource_manager::status::
use crate::services::backends::kubernetes::kubernetes_resource_manager::{
KubernetesResourceManagerConfig, UpdateLabels,
};
use crate::services::backends::kubernetes::logging_update_handler::LoggingUpdateHandler;
use crate::services::backends::kubernetes::repositories::try_into_object_ref::TryIntoObjectRef;
use crate::services::base::upsert_repository::{CanDelete, ReadOnlyRepository, UpsertRepository};
use crate::services::backends::kubernetes::resource_update_handler::ResourceUpdateHandler;
use crate::services::backends::kubernetes::resource_update_handler::logging_update_handler::LoggingUpdateHandler;
use crate::services::base::upsert_repository::{
CanDelete, ReadOnlyRepository, UpsertRepository, UpsertRepositoryWithDelete,
};
use async_trait::async_trait;
use k8s_openapi::NamespaceResourceScope;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
Expand All @@ -16,6 +19,7 @@ use serde::Serialize;
use serde::de::DeserializeOwned;
use std::fmt::Debug;
use std::hash::Hash;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
Expand Down Expand Up @@ -57,26 +61,29 @@ where
Self: Sized;
}

pub struct KubernetesRepository<Resource>
pub struct KubernetesRepository<Resource, H = LoggingUpdateHandler>
where
Resource: kube::Resource + SoftDeleteResource + Send + Sync + 'static,
Resource::DynamicType: Hash + Eq,
{
pub resource_manager: SpinLockKubernetesResourceManager<Resource>,
pub operation_timeout: Duration,
_phantom: PhantomData<(Resource, H)>,
}

impl<R> KubernetesRepository<R>
impl<R, H> KubernetesRepository<R, H>
where
R: kube::Resource + SoftDeleteResource + UpdateLabels + Send + Sync + 'static,
R::DynamicType: Hash + Eq + Clone + Default,
H: ResourceUpdateHandler<R> + Send + Sync + 'static,
{
pub async fn start(config: KubernetesResourceManagerConfig) -> anyhow::Result<Self> {
pub async fn start(config: KubernetesResourceManagerConfig, update_handler: Arc<H>) -> anyhow::Result<Self> {
let operation_timeout = config.operation_timeout;
let resource_manager = SpinLockKubernetesResourceManager::start(config, Arc::new(LoggingUpdateHandler)).await?;
let resource_manager = SpinLockKubernetesResourceManager::start(config, update_handler).await?;
Ok(KubernetesRepository {
resource_manager,
operation_timeout,
_phantom: PhantomData,
})
}

Expand All @@ -98,12 +105,13 @@ where
}

#[async_trait]
impl<Key, Value, Resource> ReadOnlyRepository<Key, Value> for KubernetesRepository<Resource>
impl<Key, Value, Resource, H> ReadOnlyRepository<Key, Value> for KubernetesRepository<Resource, H>
where
Resource: SoftDeleteResource + UpdateLabels,
Resource::DynamicType: Hash + Eq + Clone + Default,
Key: TryIntoObjectRef<Resource, Error = anyhow::Error> + Send + Sync + 'static,
Value: TryFromResource<Resource, Error = Status> + Send + Sync + 'static,
H: ResourceUpdateHandler<Resource> + Send + Sync + 'static,
{
type ReadError = Status;

Expand All @@ -124,12 +132,13 @@ where
}

#[async_trait]
impl<Key, Value, Resource> CanDelete<Key, Value> for KubernetesRepository<Resource>
impl<Key, Value, Resource, H> CanDelete<Key, Value> for KubernetesRepository<Resource, H>
where
Resource: SoftDeleteResource + UpdateLabels,
Resource::DynamicType: Hash + Eq + Clone + Default,
Key: TryIntoObjectRef<Resource, Error = anyhow::Error> + Send + Sync + Clone + 'static,
Value: Send + Sync + 'static,
H: ResourceUpdateHandler<Resource> + Send + Sync + 'static,
{
type DeleteError = Status;

Expand Down Expand Up @@ -164,12 +173,13 @@ where
}

#[async_trait]
impl<Key, Value, Resource> UpsertRepository<Key, Value> for KubernetesRepository<Resource>
impl<Key, Value, Resource, H> UpsertRepository<Key, Value> for KubernetesRepository<Resource, H>
where
Resource: SoftDeleteResource + UpdateLabels,
Resource::DynamicType: Hash + Eq + Clone + Default,
Key: TryIntoObjectRef<Resource, Error = anyhow::Error> + Send + Sync + Clone + 'static,
Value: ToResource<Resource> + TryFromResource<Resource, Error = Status> + Send + Sync + 'static,
H: ResourceUpdateHandler<Resource> + Send + Sync + 'static,
{
type Error = Status;

Expand Down Expand Up @@ -221,3 +231,13 @@ where
Ok(self.resource_manager.get(&object_ref).is_ok())
}
}

impl<Key, Value, Resource, H> UpsertRepositoryWithDelete<Key, Value> for KubernetesRepository<Resource, H>
where
Resource: SoftDeleteResource + UpdateLabels,
Resource::DynamicType: Hash + Eq + Clone + Default,
Key: TryIntoObjectRef<Resource, Error = anyhow::Error> + Send + Sync + Clone + 'static,
Value: ToResource<Resource> + TryFromResource<Resource, Error = Status> + Send + Sync + 'static,
H: ResourceUpdateHandler<Resource> + Send + Sync + 'static,
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,5 @@ impl ToAuditRecord for SchemaFragment {
}
}

impl UpsertRepositoryWithDelete<String, SchemaFragment> for KubernetesRepository<SchemaDocument> {}

pub type SchemaRepository =
dyn UpsertRepositoryWithDelete<String, SchemaFragment, DeleteError = Status, Error = Status, ReadError = Status>;
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::services::backends::kubernetes::kubernetes_resource_manager::status::
use crate::services::backends::kubernetes::repositories::TryIntoObjectRef;
use crate::services::backends::kubernetes::repositories::schema_repository::test_reduced_schema::reduced_schema;
use crate::services::backends::kubernetes::repositories::schema_repository::test_schema::schema;
use crate::services::backends::kubernetes::resource_update_handler::logging_update_handler::LoggingUpdateHandler;
use crate::testing::api_extensions::{WaitForDelete, WaitForResource};
use crate::testing::spin_lock_kubernetes_resource_manager_context::SpinLockKubernetesResourceManagerTestContext;
use assert_matches::assert_matches;
Expand All @@ -13,6 +14,7 @@ use kube::api::PostParams;
use kube::runtime::reflector::ObjectRef;
use maplit::btreemap;
use std::collections::BTreeMap;
use std::marker::PhantomData;
use std::time::Duration;
use test_context::{AsyncTestContext, test_context};

Expand All @@ -29,9 +31,10 @@ impl AsyncTestContext for KubernetesSchemaRepositoryTest {
async fn setup() -> KubernetesSchemaRepositoryTest {
let parent = SpinLockKubernetesResourceManagerTestContext::setup().await;
let label = parent.config.owner_mark.get_owner_name().clone();
let repository = Arc::new(KubernetesRepository {
let repository = Arc::new(KubernetesRepository::<SchemaDocument, LoggingUpdateHandler> {
resource_manager: parent.manager,
operation_timeout: parent.config.operation_timeout,
_phantom: PhantomData,
});
Self {
repository,
Expand Down
14 changes: 14 additions & 0 deletions src/services/backends/kubernetes/resource_update_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
pub mod composed_update_handler;
pub mod logging_update_handler;

use async_trait::async_trait;
use kube::Resource;
use kube::runtime::watcher;

#[async_trait]
pub trait ResourceUpdateHandler<S>: Send + Sync
where
S: Resource + Send + Sync,
{
async fn handle_update(&self, result: &Result<S, watcher::Error>) -> ();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use crate::services::backends::kubernetes::resource_update_handler::ResourceUpdateHandler;
use async_trait::async_trait;
use kube::Resource;
use std::fmt::Debug;

pub struct ComposedUpdateHandler<T> {
handlers: Vec<Box<dyn ResourceUpdateHandler<T>>>,
}

impl<T> ComposedUpdateHandler<T> {
pub fn new() -> Self {
Self { handlers: Vec::new() }
}

pub fn add_handler(mut self, handler: Box<dyn ResourceUpdateHandler<T>>) -> Self {
self.handlers.push(handler);
self
}
}

#[async_trait]
impl<T> ResourceUpdateHandler<T> for ComposedUpdateHandler<T>
where
T: Resource + Debug + Send + Sync + 'static,
{
async fn handle_update(&self, result: &Result<T, kube::runtime::watcher::Error>) -> () {
for handler in &self.handlers {
handler.handle_update(result).await;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
#[cfg(not(test))]
use log::{debug, warn};

#[cfg(test)]
use std::{println as warn, println as debug};

use crate::services::backends::kubernetes::kubernetes_resource_watcher::ResourceUpdateHandler;
use crate::services::backends::kubernetes::resource_update_handler::ResourceUpdateHandler;
use async_trait::async_trait;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use kube::Resource;
use kube::runtime::watcher;
use log::{debug, warn};

use std::fmt::Debug;

Expand All @@ -19,8 +14,8 @@ impl<T> ResourceUpdateHandler<T> for LoggingUpdateHandler
where
T: Resource + Debug + Send + Sync + 'static,
{
async fn handle_update(&self, event: Result<T, watcher::Error>) -> () {
if let Err(e) = event {
async fn handle_update(&self, event: &Result<T, watcher::Error>) -> () {
if let Err(e) = event.as_ref() {
warn!("Error processing event: {}", e);
return;
}
Expand Down
43 changes: 39 additions & 4 deletions src/services/backends/memory.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::services::base::upsert_repository::{
CanDelete, ReadOnlyRepository, UpsertRepository, UpsertRepositoryWithDelete,
CanDelete, ReadOnlyRepository, ReadOnlyRepositoryWithFactory, UpsertRepository, UpsertRepositoryWithDelete,
ValueFactory,
};
use anyhow::bail;
use async_trait::async_trait;
Expand All @@ -9,7 +10,7 @@ use std::hash::Hash;
use tokio::sync::RwLock;

#[async_trait]
impl<Entity, Key> ReadOnlyRepository<Key, Entity> for RwLock<HashMap<Key, Entity>>
impl<Key, Entity> ReadOnlyRepository<Key, Entity> for RwLock<HashMap<Key, Entity>>
where
Entity: Clone + Send + Sync,
Key: Debug + Eq + Hash + Send + Sync,
Expand All @@ -26,7 +27,7 @@ where
}

#[async_trait]
impl<Entity, Key> UpsertRepository<Key, Entity> for RwLock<HashMap<Key, Entity>>
impl<Key, Entity> UpsertRepository<Key, Entity> for RwLock<HashMap<Key, Entity>>
where
Entity: Send + Sync + Clone,
Key: Send + Sync + Eq + Hash + Debug,
Expand Down Expand Up @@ -60,9 +61,43 @@ where
}
}

impl<Entity, Key> UpsertRepositoryWithDelete<Key, Entity> for RwLock<HashMap<Key, Entity>>
impl<Key, Entity> UpsertRepositoryWithDelete<Key, Entity> for RwLock<HashMap<Key, Entity>>
where
Entity: Send + Sync + Clone,
Key: Send + Sync + Eq + Hash + Debug,
{
}

#[async_trait]
impl<Key, Entity> ReadOnlyRepositoryWithFactory<Key, Entity> for RwLock<HashMap<Key, Entity>>
where
Entity: Send + Sync + Clone,
Key: Send + Sync + Eq + Hash + Debug + Clone,
{
type ReadError = anyhow::Error;

async fn get(
&self,
key: Key,
create_new: &dyn ValueFactory<Key, Entity, CreateError = Self::ReadError>,
) -> Result<Entity, Self::ReadError> {
// First, acquire a read lock to check if the entity exists.
{
let read_guard = self.read().await;
if let Some(entity) = (*read_guard).get(&key) {
return Ok(entity.clone());
}
}
// Release the read lock before calling the factory.
let new_entity = create_new.create(&key).await?;
// Acquire a write lock to insert the new entity.
let mut write_guard = self.write().await;
// Check again in case another thread inserted it while we were creating.
if let Some(entity) = (*write_guard).get(&key) {
Ok(entity.clone())
} else {
(*write_guard).insert(key, new_entity.clone());
Ok(new_entity)
}
}
}
Loading