Skip to content

Commit c842031

Browse files
refactor: k8s clients (#1332)
The refactor consists in 1. Remove all methods in `DynamicObjectManagers` that called methods in the inner elements. Converting it into a real "collection". 2. Make sure that `SyncK8sClient` methods always call `AsyncK8sClient` methods.
1 parent 5fa4134 commit c842031

File tree

3 files changed

+162
-181
lines changed

3 files changed

+162
-181
lines changed

agent-control/src/k8s/client.rs

Lines changed: 105 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,12 @@ impl SyncK8sClient {
6565

6666
pub fn apply_dynamic_object(&self, obj: &DynamicObject) -> Result<(), K8sError> {
6767
self.runtime
68-
.block_on(self.async_client.dynamic_object_managers.apply(obj))
68+
.block_on(self.async_client.apply_dynamic_object(obj))
69+
}
70+
71+
pub fn apply_dynamic_object_if_changed(&self, obj: &DynamicObject) -> Result<(), K8sError> {
72+
self.runtime
73+
.block_on(self.async_client.apply_dynamic_object_if_changed(obj))
6974
}
7075

7176
pub fn patch_dynamic_object(
@@ -74,24 +79,8 @@ impl SyncK8sClient {
7479
name: &str,
7580
patch: serde_json::Value,
7681
) -> Result<DynamicObject, K8sError> {
77-
self.runtime.block_on(
78-
self.async_client
79-
.dynamic_object_managers
80-
.patch(tm, name, patch),
81-
)
82-
}
83-
84-
pub fn has_dynamic_object_changed(&self, obj: &DynamicObject) -> Result<bool, K8sError> {
8582
self.runtime
86-
.block_on(self.async_client.dynamic_object_managers.has_changed(obj))
87-
}
88-
89-
pub fn apply_dynamic_object_if_changed(&self, obj: &DynamicObject) -> Result<(), K8sError> {
90-
self.runtime.block_on(
91-
self.async_client
92-
.dynamic_object_managers
93-
.apply_if_changed(obj),
94-
)
83+
.block_on(self.async_client.patch_dynamic_object(tm, name, patch))
9584
}
9685

9786
pub fn get_dynamic_object(
@@ -100,15 +89,16 @@ impl SyncK8sClient {
10089
name: &str,
10190
) -> Result<Option<Arc<DynamicObject>>, K8sError> {
10291
self.runtime
103-
.block_on(self.async_client.dynamic_object_managers.get(tm, name))
92+
.block_on(self.async_client.get_dynamic_object(tm, name))
10493
}
94+
10595
pub fn delete_dynamic_object(
10696
&self,
10797
tm: &TypeMeta,
10898
name: &str,
10999
) -> Result<Either<DynamicObject, Status>, K8sError> {
110100
self.runtime
111-
.block_on(self.async_client.dynamic_object_managers.delete(tm, name))
101+
.block_on(self.async_client.delete_dynamic_object(tm, name))
112102
}
113103

114104
pub fn delete_dynamic_object_collection(
@@ -118,14 +108,18 @@ impl SyncK8sClient {
118108
) -> Result<Either<ObjectList<DynamicObject>, Status>, K8sError> {
119109
self.runtime.block_on(
120110
self.async_client
121-
.dynamic_object_managers
122-
.delete_collection(tm, label_selector),
111+
.delete_dynamic_object_collection(tm, label_selector),
123112
)
124113
}
125114

126115
pub fn list_dynamic_objects(&self, tm: &TypeMeta) -> Result<Vec<Arc<DynamicObject>>, K8sError> {
127116
self.runtime
128-
.block_on(self.async_client.dynamic_object_managers.list(tm))
117+
.block_on(self.async_client.list_dynamic_objects(tm))
118+
}
119+
120+
pub fn has_dynamic_object_changed(&self, obj: &DynamicObject) -> Result<bool, K8sError> {
121+
self.runtime
122+
.block_on(self.async_client.has_dynamic_object_changed(obj))
129123
}
130124

131125
pub fn delete_configmap_collection(&self, label_selector: &str) -> Result<(), K8sError> {
@@ -233,10 +227,6 @@ impl AsyncK8sClient {
233227
})
234228
}
235229

236-
pub fn dynamic_object_managers(&self) -> &DynamicObjectManagers {
237-
&self.dynamic_object_managers
238-
}
239-
240230
// Due to the Kube-rs library we need to retrieve with two different calls the versions of each object and then fetch the available kinds
241231
pub async fn list_api_resources(&self) -> Result<Vec<APIResourceList>, K8sError> {
242232
let mut list = vec![];
@@ -355,6 +345,94 @@ impl AsyncK8sClient {
355345
pub fn default_namespace(&self) -> &str {
356346
self.client.default_namespace()
357347
}
348+
349+
pub async fn apply_dynamic_object(&self, obj: &DynamicObject) -> Result<(), K8sError> {
350+
let type_meta = get_type_meta(obj)?;
351+
352+
self.dynamic_object_managers
353+
.get_or_create(&type_meta)
354+
.await?
355+
.apply(obj)
356+
.await
357+
}
358+
359+
pub async fn apply_dynamic_object_if_changed(
360+
&self,
361+
obj: &DynamicObject,
362+
) -> Result<(), K8sError> {
363+
let type_meta = get_type_meta(obj)?;
364+
365+
self.dynamic_object_managers
366+
.get_or_create(&type_meta)
367+
.await?
368+
.apply_if_changed(obj)
369+
.await
370+
}
371+
372+
pub async fn patch_dynamic_object(
373+
&self,
374+
type_meta: &TypeMeta,
375+
name: &str,
376+
patch: serde_json::Value,
377+
) -> Result<DynamicObject, K8sError> {
378+
self.dynamic_object_managers
379+
.get_or_create(type_meta)
380+
.await?
381+
.patch(name, patch)
382+
.await
383+
}
384+
385+
pub async fn get_dynamic_object(
386+
&self,
387+
tm: &TypeMeta,
388+
name: &str,
389+
) -> Result<Option<Arc<DynamicObject>>, K8sError> {
390+
Ok(self
391+
.dynamic_object_managers
392+
.get_or_create(tm)
393+
.await?
394+
.get(name))
395+
}
396+
397+
pub async fn delete_dynamic_object(
398+
&self,
399+
tm: &TypeMeta,
400+
name: &str,
401+
) -> Result<Either<DynamicObject, Status>, K8sError> {
402+
self.dynamic_object_managers
403+
.get_or_create(tm)
404+
.await?
405+
.delete(name)
406+
.await
407+
}
408+
409+
pub async fn delete_dynamic_object_collection(
410+
&self,
411+
tm: &TypeMeta,
412+
label_selector: &str,
413+
) -> Result<Either<ObjectList<DynamicObject>, Status>, K8sError> {
414+
self.dynamic_object_managers
415+
.get_or_create(tm)
416+
.await?
417+
.delete_collection(label_selector)
418+
.await
419+
}
420+
421+
pub async fn list_dynamic_objects(
422+
&self,
423+
tm: &TypeMeta,
424+
) -> Result<Vec<Arc<DynamicObject>>, K8sError> {
425+
Ok(self.dynamic_object_managers.get_or_create(tm).await?.list())
426+
}
427+
428+
pub async fn has_dynamic_object_changed(&self, obj: &DynamicObject) -> Result<bool, K8sError> {
429+
let tm = get_type_meta(obj)?;
430+
431+
self.dynamic_object_managers
432+
.get_or_create(&tm)
433+
.await?
434+
.has_changed(obj)
435+
}
358436
}
359437

360438
// delete_collection has been moved outside the client to be able to use mockall in the client

agent-control/src/k8s/dynamic_object.rs

Lines changed: 12 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
//! This module holds helpers and to perform k8s operations with resources whose type is known at runtime
22
//! (DynamicObjects).
33
use super::{
4-
client::{get_name, get_type_meta},
4+
client::{delete_collection, get_name},
55
error::K8sError,
66
reflector::definition::{Reflector, ReflectorBuilder},
77
utils::display_type,
88
};
9-
use crate::k8s::client::delete_collection;
109
use base64::engine::general_purpose::STANDARD;
1110
use either::Either;
1211
use kube::api::{ObjectList, Patch, PatchParams};
@@ -155,6 +154,13 @@ impl DynamicObjectManager {
155154
}
156155
Ok(either)
157156
}
157+
158+
pub async fn delete_collection(
159+
&self,
160+
label_selector: &str,
161+
) -> Result<Either<ObjectList<DynamicObject>, Status>, K8sError> {
162+
delete_collection(&self.api, label_selector).await
163+
}
158164
}
159165

160166
/// Holds a collection of [DynamicObjectManager] by [TypeMeta] to perform operations with objects known at runtime.
@@ -175,79 +181,10 @@ impl DynamicObjectManagers {
175181
}
176182
}
177183

178-
pub async fn get(
179-
&self,
180-
type_meta: &TypeMeta,
181-
name: &str,
182-
) -> Result<Option<Arc<DynamicObject>>, K8sError> {
183-
Ok(self.get_or_create_manager(type_meta).await?.get(name))
184-
}
185-
186-
pub async fn delete_collection(
187-
&self,
188-
type_meta: &TypeMeta,
189-
label_selector: &str,
190-
) -> Result<Either<ObjectList<DynamicObject>, Status>, K8sError> {
191-
let api = self.get_or_create_manager(type_meta).await?.api.clone();
192-
193-
delete_collection(&api, label_selector).await
194-
}
195-
196-
pub async fn delete(
197-
&self,
198-
type_meta: &TypeMeta,
199-
name: &str,
200-
) -> Result<Either<DynamicObject, Status>, K8sError> {
201-
self.get_or_create_manager(type_meta)
202-
.await?
203-
.delete(name)
204-
.await
205-
}
206-
207-
pub async fn list(&self, type_meta: &TypeMeta) -> Result<Vec<Arc<DynamicObject>>, K8sError> {
208-
Ok(self.get_or_create_manager(type_meta).await?.list())
209-
}
210-
211-
pub async fn apply(&self, obj: &DynamicObject) -> Result<(), K8sError> {
212-
let type_meta = &get_type_meta(obj)?;
213-
214-
self.get_or_create_manager(type_meta)
215-
.await?
216-
.apply(obj)
217-
.await
218-
}
219-
220-
pub async fn apply_if_changed(&self, obj: &DynamicObject) -> Result<(), K8sError> {
221-
let type_meta = &get_type_meta(obj)?;
222-
223-
self.get_or_create_manager(type_meta)
224-
.await?
225-
.apply_if_changed(obj)
226-
.await
227-
}
228-
229-
pub async fn patch(
230-
&self,
231-
type_meta: &TypeMeta,
232-
name: &str,
233-
patch: serde_json::Value,
234-
) -> Result<DynamicObject, K8sError> {
235-
self.get_or_create_manager(type_meta)
236-
.await?
237-
.patch(name, patch)
238-
.await
239-
}
240-
241-
pub async fn has_changed(&self, obj: &DynamicObject) -> Result<bool, K8sError> {
242-
let type_meta = &get_type_meta(obj)?;
243-
244-
self.get_or_create_manager(type_meta)
245-
.await?
246-
.has_changed(obj)
247-
}
248-
249-
/// Obtains the manager for the provided [TypeMeta]. If it does not exist it creates it and stores it.
250-
async fn get_or_create_manager(
184+
/// Returns the manager for the provided [TypeMeta].
185+
///
186+
/// If it does not exist it creates it and stores it.
187+
pub(super) async fn get_or_create(
251188
&self,
252189
type_meta: &TypeMeta,
253190
) -> Result<Arc<DynamicObjectManager>, K8sError> {

0 commit comments

Comments
 (0)