Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ ws = ["kube/ws"]
latest = ["k8s-openapi/latest"]

[dev-dependencies]
parking_lot.workspace = true
tokio-util.workspace = true
assert-json-diff.workspace = true
garde = { version = "0.22.0", default-features = false, features = ["derive"] }
Expand Down Expand Up @@ -130,6 +131,10 @@ path = "log_stream.rs"
name = "multi_watcher"
path = "multi_watcher.rs"

[[example]]
name = "multi_reflector"
path = "multi_reflector.rs"

[[example]]
name = "pod_api"
path = "pod_api.rs"
Expand Down
145 changes: 145 additions & 0 deletions examples/multi_reflector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
use futures::{future, stream, StreamExt};
use k8s_openapi::api::{
apps::v1::Deployment,
core::v1::{ConfigMap, Secret},
};
use kube::{
api::{ApiResource, DynamicObject, GroupVersionKind},
core::TypedResource,
runtime::{reflector::store::CacheWriter, watcher, WatchStreamExt},
Api, Client, Resource,
};
use parking_lot::RwLock;
use serde::de::DeserializeOwned;
use std::sync::Arc;
use tracing::*;

use std::collections::HashMap;

type Cache = Arc<RwLock<HashMap<LookupKey, Arc<DynamicObject>>>>;

#[derive(Default, Clone, Hash, PartialEq, Eq, Debug)]
struct LookupKey {
gvk: GroupVersionKind,
name: Option<String>,
namespace: Option<String>,
}

impl LookupKey {
fn new<R: TypedResource>(resource: &R) -> LookupKey {
let meta = resource.meta();
LookupKey {
gvk: resource.gvk(),
name: meta.name.clone(),
namespace: meta.namespace.clone(),
}
}
}

#[derive(Default, Clone)]
struct MultiCache {
store: Cache,
}

impl MultiCache {
fn get<K: Resource<DynamicType = impl Default> + DeserializeOwned + Clone>(
&self,
name: &str,
ns: &str,
) -> Option<Arc<K>> {
let obj = self
.store
.read()
.get(&LookupKey {
gvk: K::gvk(&Default::default()),
name: Some(name.into()),
namespace: if !ns.is_empty() { Some(ns.into()) } else { None },
})?
.as_ref()
.clone();
obj.try_parse().ok().map(Arc::new)
}
}

impl CacheWriter<DynamicObject> for MultiCache {
/// Applies a single watcher event to the store
fn apply_watcher_event(&mut self, event: &watcher::Event<DynamicObject>) {
match event {
watcher::Event::Init | watcher::Event::InitDone => {}
watcher::Event::Delete(obj) => {
self.store.write().remove(&LookupKey::new(obj));
}
watcher::Event::InitApply(obj) | watcher::Event::Apply(obj) => {
self.store
.write()
.insert(LookupKey::new(obj), Arc::new(obj.clone()));
}
}
}
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let client = Client::try_default().await?;

// multistore
let mut combo_stream = stream::select_all(vec![]);
combo_stream.push(
watcher::watcher(
Api::all_with(client.clone(), &ApiResource::erase::<Deployment>(&())),
Default::default(),
)
.boxed(),
);
combo_stream.push(
watcher::watcher(
Api::all_with(client.clone(), &ApiResource::erase::<ConfigMap>(&())),
Default::default(),
)
.boxed(),
);

// // Duplicate streams with narrowed down selection
combo_stream.push(
watcher::watcher(
Api::default_namespaced_with(client.clone(), &ApiResource::erase::<Secret>(&())),
Default::default(),
)
.boxed(),
);
combo_stream.push(
watcher::watcher(
Api::all_with(client.clone(), &ApiResource::erase::<Secret>(&())),
Default::default(),
)
.boxed(),
);

let multi_writer = MultiCache::default();
let watcher = combo_stream
.reflect(multi_writer.clone())
.applied_objects()
.for_each(|_| future::ready(()));

// simulate doing stuff with the stores from some other thread
tokio::spawn(async move {
// can use helper accessors
loop {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
info!("cache content: {:?}", multi_writer.store.read().keys());
info!(
"common cm: {:?}",
multi_writer.get::<ConfigMap>("kube-root-ca.crt", "kube-system")
);
// access individual sub stores
info!("Current objects count: {}", multi_writer.store.read().len());
}
});
info!("long watches starting");
tokio::select! {
r = watcher => println!("watcher exit: {r:?}"),
}

Ok(())
}
2 changes: 1 addition & 1 deletion kube-core/src/gvk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use thiserror::Error;
pub struct ParseGroupVersionError(pub String);

/// Core information about an API Resource.
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash)]
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash, Default)]
pub struct GroupVersionKind {
/// API group
pub group: String,
Expand Down
2 changes: 1 addition & 1 deletion kube-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub mod gvk;
pub use gvk::{GroupVersion, GroupVersionKind, GroupVersionResource};

pub mod metadata;
pub use metadata::{ListMeta, ObjectMeta, PartialObjectMeta, PartialObjectMetaExt, TypeMeta};
pub use metadata::{ListMeta, ObjectMeta, PartialObjectMeta, PartialObjectMetaExt, TypeMeta, TypedResource};

pub mod labels;

Expand Down
137 changes: 136 additions & 1 deletion kube-core/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{borrow::Cow, marker::PhantomData};
pub use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ListMeta, ObjectMeta};
use serde::{Deserialize, Serialize};

use crate::{DynamicObject, Resource};
use crate::{ApiResource, DynamicObject, GroupVersionKind, Resource};

/// Type information that is flattened into every kubernetes object
#[derive(Deserialize, Serialize, Clone, Default, Debug, Eq, PartialEq, Hash)]
Expand Down Expand Up @@ -51,6 +51,24 @@ impl TypeMeta {
kind: K::kind(&()).into(),
}
}

/// Construct a new `TypeMeta` for the object from the list `TypeMeta`.
///
/// ```
/// # use k8s_openapi::api::core::v1::Pod;
/// # use kube_core::TypeMeta;
///
/// let mut type_meta = TypeMeta::resource::<Pod>();
/// type_meta.kind = "PodList".to_string();
/// assert_eq!(type_meta.clone().singular().kind, "Pod");
/// assert_eq!(type_meta.clone().singular().api_version, "v1");
/// ```
pub fn singular(self) -> Self {
Self {
kind: self.kind.strip_suffix("List").unwrap_or(&self.kind).to_string(),
..self
}
}
Comment on lines +55 to +69
Copy link
Copy Markdown
Member

@clux clux Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this fn is nice by itself. but it relies on context that is not clear from the name;

  • it's not clear that it works on a List type
  • it's not clear it's a noop on a non-list type

maybe it's better to have this fn named as fn singularize_list to resolve this.
feel free to do this as it's own pr.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

}

/// A generic representation of any object with `ObjectMeta`.
Expand Down Expand Up @@ -175,6 +193,123 @@ impl<K: Resource> Resource for PartialObjectMeta<K> {
}
}

///
pub trait TypedResource: Resource + Sized {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this trait differ from one of our existing ones here? How does this help out with solving the issue? I see a lot of trait magic here, and it's non-trivial for me to try to decipher this.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a problem with anything else available. DynamicObject is a special case of resource, and unlike any typed resource it has types field we can use.

Unfortunately for anything generated by k8s-openapi or derive CustomResourceDefinition types, TypeMeta is available from constant kind, group and version. There is no type field we can use, so data, even if returned from API server, is lost.

This ✨ magic ✨ makes it possible to have GVK of a resource available as if in a blanket implementation, and it is used for identification of resources in dynamic watchers cache, for routing of events.

This dynamic cache can later be used as a source to establish dynamic watches on statically typed resources, if used as a stream which filters resources by GVK and serializes it to the expected type. This is not a part of this implementation yet, thought.

Such cache, if passed around one or multiple controllers, may serve as an up-to-date state of all watched objects for read operations.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JIC: I removed this change as after some debugging it appears that:

  • List returns no TypeMeta on the resource. It is the only place where this trait and additional serialization step is needed.
  • When using streamingList watch option, enabled by default starting from k/k 1.32 no list requests are sent to k8s API server, so it is not needed.

This implementation is therefore focused on 1.32 clusters with streamingList setting enabled for watchers.

///
fn type_meta(&self) -> TypeMeta;
///
fn gvk(&self) -> GroupVersionKind;
///
fn kind(&self) -> Cow<'_, str>;
///
fn group(&self) -> Cow<'_, str>;
///
fn version(&self) -> Cow<'_, str>;
///
fn plural(&self) -> Cow<'_, str>;
}

impl<K> TypedResource for K
where
K: Resource,
(K, K::DynamicType): TypedResourceImpl<Resource = K>,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol. yeah, this is a bit too much magic imo.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now removed, for context: #1692 (comment)

{
fn type_meta(&self) -> TypeMeta {
<(K, K::DynamicType) as TypedResourceImpl>::type_meta(self)
}

fn gvk(&self) -> GroupVersionKind {
<(K, K::DynamicType) as TypedResourceImpl>::gvk(self)
}

fn kind(&self) -> Cow<'_, str> {
<(K, K::DynamicType) as TypedResourceImpl>::kind(self)
}
///
fn group(&self) -> Cow<'_, str> {
<(K, K::DynamicType) as TypedResourceImpl>::group(self)
}
///
fn version(&self) -> Cow<'_, str> {
<(K, K::DynamicType) as TypedResourceImpl>::version(self)
}
///
fn plural(&self) -> Cow<'_, str> {
<(K, K::DynamicType) as TypedResourceImpl>::plural(self)
}
}

#[doc(hidden)]
// Workaround for https://github.com/rust-lang/rust/issues/20400
pub trait TypedResourceImpl {
type Resource: Resource;
fn type_meta(res: &Self::Resource) -> TypeMeta;
fn gvk(res: &Self::Resource) -> GroupVersionKind;
fn kind(res: &Self::Resource) -> Cow<'_, str>;
fn group(res: &Self::Resource) -> Cow<'_, str>;
fn version(res: &Self::Resource) -> Cow<'_, str>;
fn plural(res: &Self::Resource) -> Cow<'_, str>;
}

impl<K> TypedResourceImpl for (K, ())
where
K: Resource<DynamicType = ()>,
{
type Resource = K;

fn type_meta(_: &Self::Resource) -> TypeMeta {
TypeMeta::resource::<K>()
}

fn gvk(res: &Self::Resource) -> GroupVersionKind {
GroupVersionKind::gvk(&res.group(), &res.version(), &res.kind())
}

fn kind(_: &Self::Resource) -> Cow<'_, str> {
K::kind(&())
}

fn group(_: &Self::Resource) -> Cow<'_, str> {
K::group(&())
}

fn version(_: &Self::Resource) -> Cow<'_, str> {
K::version(&())
}

fn plural(_: &Self::Resource) -> Cow<'_, str> {
K::plural(&())
}
}

impl TypedResourceImpl for (DynamicObject, ApiResource) {
type Resource = DynamicObject;

fn type_meta(obj: &Self::Resource) -> TypeMeta {
obj.types.clone().unwrap_or_default()
}

fn gvk(res: &Self::Resource) -> GroupVersionKind {
res.type_meta().try_into().unwrap_or_default()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have derived Default on GroupVersionKind for this afaikt, but this is not particularly useful. I would rather not derive this Default (because it's not useful in the default state), and have this fn return an Option

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be fixed.

}

fn kind(res: &Self::Resource) -> Cow<'_, str> {
Cow::from(res.type_meta().kind)
}

fn group(res: &Self::Resource) -> Cow<'_, str> {
Cow::from(res.gvk().group)
}

fn version(res: &Self::Resource) -> Cow<'_, str> {
Cow::from(res.gvk().version)
}

fn plural(res: &Self::Resource) -> Cow<'_, str> {
Cow::from(ApiResource::from_gvk(&res.gvk()).plural)
}
}

#[cfg(test)]
mod test {
use super::{ObjectMeta, PartialObjectMeta, PartialObjectMetaExt};
Expand Down
Loading