Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 9 additions & 26 deletions examples/dynamic_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
use futures::{Stream, StreamExt, TryStreamExt};
use futures::{StreamExt, TryStreamExt};
use kube::{
api::{Api, ApiResource, DynamicObject, GroupVersionKind, Resource, ResourceExt},
runtime::{WatchStreamExt, metadata_watcher, watcher, watcher::Event},
api::{Api, DynamicObject, GroupVersionKind, ResourceExt},
runtime::{WatchStreamExt, watcher},
};
use serde::de::DeserializeOwned;
use tracing::*;

use std::{env, fmt::Debug};
use std::env;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let client = kube::Client::try_default().await?;

// If set will receive only the metadata for watched resources
let watch_metadata = env::var("WATCH_METADATA").map(|s| s == "1").unwrap_or(false);

// Take dynamic resource identifiers:
let group = env::var("GROUP").unwrap_or_else(|_| "".into());
let version = env::var("VERSION").unwrap_or_else(|_| "v1".into());
Expand All @@ -28,28 +24,15 @@ async fn main() -> anyhow::Result<()> {

// Use the full resource info to create an Api with the ApiResource as its DynamicType
let api = Api::<DynamicObject>::all_with(client, &ar);
let wc = watcher::Config::default();

// Start a metadata or a full resource watch
if watch_metadata {
handle_events(metadata_watcher(api, wc), &ar).await
} else {
handle_events(watcher(api, wc), &ar).await
}
}

async fn handle_events<
K: Resource<DynamicType = ApiResource> + Clone + Debug + Send + DeserializeOwned + 'static,
>(
stream: impl Stream<Item = watcher::Result<Event<K>>> + Send + 'static,
ar: &ApiResource,
) -> anyhow::Result<()> {
let mut items = stream.applied_objects().boxed();
// For metadata-only watching, use Api::<PartialObjectMeta<DynamicObject>> instead.
// PartialObjectMeta-based Api automatically uses efficient metadata-only requests.
let mut items = watcher(api, watcher::Config::default()).applied_objects().boxed();
while let Some(p) = items.try_next().await? {
if let Some(ns) = p.namespace() {
info!("saw {} {} in {ns}", K::kind(ar), p.name_any());
info!("saw {kind} {} in {ns}", p.name_any());
} else {
info!("saw {} {}", K::kind(ar), p.name_any());
info!("saw {kind} {}", p.name_any());
}
trace!("full obj: {p:?}");
}
Expand Down
28 changes: 24 additions & 4 deletions kube-client/src/api/core_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,12 @@ where
/// This function assumes that the object is expected to always exist, and returns [`Error`] if it does not.
/// Consider using [`Api::get_opt`] if you need to handle missing objects.
pub async fn get_with(&self, name: &str, gp: &GetParams) -> Result<K> {
let mut req = self.request.get(name, gp).map_err(Error::BuildRequest)?;
let mut req = if self.metadata_api {
self.request.get_metadata(name, gp)
} else {
self.request.get(name, gp)
}
.map_err(Error::BuildRequest)?;
req.extensions_mut().insert("get");
self.client.request::<K>(req).await
}
Expand Down Expand Up @@ -215,7 +220,12 @@ where
/// # }
/// ```
pub async fn list(&self, lp: &ListParams) -> Result<ObjectList<K>> {
let mut req = self.request.list(lp).map_err(Error::BuildRequest)?;
let mut req = if self.metadata_api {
self.request.list_metadata(lp)
} else {
self.request.list(lp)
}
.map_err(Error::BuildRequest)?;
req.extensions_mut().insert("list");
self.client.request::<ObjectList<K>>(req).await
}
Expand Down Expand Up @@ -381,7 +391,12 @@ where
pp: &PatchParams,
patch: &Patch<P>,
) -> Result<K> {
let mut req = self.request.patch(name, pp, patch).map_err(Error::BuildRequest)?;
let mut req = if self.metadata_api {
self.request.patch_metadata(name, pp, patch)
} else {
self.request.patch(name, pp, patch)
}
.map_err(Error::BuildRequest)?;
req.extensions_mut().insert("patch");
self.client.request::<K>(req).await
}
Expand Down Expand Up @@ -538,7 +553,12 @@ where
wp: &WatchParams,
version: &str,
) -> Result<impl Stream<Item = Result<WatchEvent<K>>> + use<K>> {
let mut req = self.request.watch(wp, version).map_err(Error::BuildRequest)?;
let mut req = if self.metadata_api {
self.request.watch_metadata(wp, version)
} else {
self.request.watch(wp, version)
}
.map_err(Error::BuildRequest)?;
req.extensions_mut().insert("watch");
self.client.request_events::<K>(req).await
}
Expand Down
8 changes: 8 additions & 0 deletions kube-client/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ pub struct Api<K> {
/// The client to use (from this library)
pub(crate) client: Client,
namespace: Option<String>,
/// Whether requests should use metadata-only Accept headers
/// (cached from `K::metadata_api()` at construction so that `impl<K> Api<K>`
/// method blocks don't have to tighten to `K: Resource`).
pub(crate) metadata_api: bool,
/// Note: Using `iter::Empty` over `PhantomData`, because we never actually keep any
/// `K` objects, so `Empty` better models our constraints (in particular, `Empty<K>`
/// is `Send`, even if `K` may not be).
Expand Down Expand Up @@ -83,6 +87,7 @@ impl<K: Resource> Api<K> {
client,
request: Request::new(url),
namespace: None,
metadata_api: K::metadata_api(),
_phantom: std::iter::empty(),
}
}
Expand All @@ -100,6 +105,7 @@ impl<K: Resource> Api<K> {
client,
request: Request::new(url),
namespace: Some(ns.to_string()),
metadata_api: K::metadata_api(),
_phantom: std::iter::empty(),
}
}
Expand Down Expand Up @@ -194,6 +200,7 @@ where
client,
request: Request::new(url),
namespace: Some(ns.to_string()),
metadata_api: K::metadata_api(),
_phantom: std::iter::empty(),
}
}
Expand Down Expand Up @@ -241,6 +248,7 @@ impl<K> Debug for Api<K> {
request,
client: _,
namespace,
metadata_api: _,
_phantom,
} = self;
f.debug_struct("Api")
Expand Down
112 changes: 112 additions & 0 deletions kube-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,7 @@ mod tests {

use http::{Request, Response};
use k8s_openapi::api::core::v1::Pod;
use kube_core::metadata::PartialObjectMeta;
use tower_test::mock;

#[tokio::test]
Expand Down Expand Up @@ -665,4 +666,115 @@ mod tests {
assert_eq!(pod.metadata.annotations.unwrap().get("kube-rs").unwrap(), "test");
spawned.await.unwrap();
}

#[tokio::test]
async fn test_partial_object_meta_get_uses_metadata_header() {
let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
let spawned = tokio::spawn(async move {
let mut handle = pin!(handle);
let (request, send) = handle.next_request().await.expect("service not called");
// Verify the metadata-only Accept header is set
assert_eq!(
request.headers().get(http::header::ACCEPT).unwrap(),
"application/json;as=PartialObjectMetadata;g=meta.k8s.io;v=v1"
);
let pod_meta: PartialObjectMeta<Pod> =
serde_json::from_value(serde_json::json!({
"apiVersion": "meta.k8s.io/v1",
"kind": "PartialObjectMetadata",
"metadata": {
"name": "test",
},
}))
.unwrap();
send.send_response(Response::new(Body::from(
serde_json::to_vec(&pod_meta).unwrap(),
)));
});

let pods: Api<PartialObjectMeta<Pod>> =
Api::default_namespaced(Client::new(mock_service, "default"));
let pod = pods.get("test").await.unwrap();
assert_eq!(pod.metadata.name, Some("test".to_string()));
spawned.await.unwrap();
}

#[tokio::test]
async fn test_partial_object_meta_list_uses_metadata_header() {
let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
let spawned = tokio::spawn(async move {
let mut handle = pin!(handle);
let (request, send) = handle.next_request().await.expect("service not called");
assert_eq!(
request.headers().get(http::header::ACCEPT).unwrap(),
"application/json;as=PartialObjectMetadataList;g=meta.k8s.io;v=v1"
);
send.send_response(Response::new(Body::from(
serde_json::to_vec(&serde_json::json!({
"apiVersion": "meta.k8s.io/v1",
"kind": "PartialObjectMetadataList",
"metadata": { "resourceVersion": "1" },
"items": [],
}))
.unwrap(),
)));
});

let pods: Api<PartialObjectMeta<Pod>> =
Api::default_namespaced(Client::new(mock_service, "default"));
let list = pods.list(&Default::default()).await.unwrap();
assert_eq!(list.items.len(), 0);
spawned.await.unwrap();
}

#[tokio::test]
async fn test_partial_object_meta_patch_uses_metadata_header() {
use kube_core::params::{Patch, PatchParams};

let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
let spawned = tokio::spawn(async move {
let mut handle = pin!(handle);
let (request, send) = handle.next_request().await.expect("service not called");
assert_eq!(request.method(), http::Method::PATCH);
assert_eq!(
request.headers().get(http::header::ACCEPT).unwrap(),
"application/json;as=PartialObjectMetadata;g=meta.k8s.io;v=v1"
);
send.send_response(Response::new(Body::from(
serde_json::to_vec(&serde_json::json!({
"apiVersion": "meta.k8s.io/v1",
"kind": "PartialObjectMetadata",
"metadata": { "name": "test" },
}))
.unwrap(),
)));
});

let pods: Api<PartialObjectMeta<Pod>> =
Api::default_namespaced(Client::new(mock_service, "default"));
let patch = Patch::Merge(serde_json::json!({ "metadata": { "labels": { "a": "b" } } }));
let pod = pods.patch("test", &PatchParams::default(), &patch).await.unwrap();
assert_eq!(pod.metadata.name, Some("test".to_string()));
spawned.await.unwrap();
}

#[tokio::test]
async fn test_partial_object_meta_watch_uses_metadata_header() {
let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
let spawned = tokio::spawn(async move {
let mut handle = pin!(handle);
let (request, send) = handle.next_request().await.expect("service not called");
assert_eq!(
request.headers().get(http::header::ACCEPT).unwrap(),
"application/json;as=PartialObjectMetadata;g=meta.k8s.io;v=v1"
);
Comment thread
clux marked this conversation as resolved.
// Send an empty response to close the stream
send.send_response(Response::new(Body::from(vec![])));
});

let pods: Api<PartialObjectMeta<Pod>> =
Api::default_namespaced(Client::new(mock_service, "default"));
let _ = pods.watch(&Default::default(), "0").await;
spawned.await.unwrap();
}
}
12 changes: 12 additions & 0 deletions kube-core/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ impl<K: Resource> Resource for PartialObjectMeta<K> {
type DynamicType = K::DynamicType;
type Scope = K::Scope;

fn metadata_api() -> bool {
true
}

fn kind(dt: &Self::DynamicType) -> Cow<'_, str> {
K::kind(dt)
}
Expand Down Expand Up @@ -207,4 +211,12 @@ mod test {
assert_eq!(response_pom.types.as_ref().unwrap().api_version, "meta.k8s.io/v1");
assert_eq!(response_pom.types.as_ref().unwrap().kind, "PartialObjectMetadata");
}

#[test]
fn metadata_api_flag() {
// PartialObjectMeta should signal metadata-only API requests
assert!(PartialObjectMeta::<Pod>::metadata_api());
// Regular resources should not
assert!(!Pod::metadata_api());
}
}
26 changes: 26 additions & 0 deletions kube-core/src/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,32 @@ pub trait Resource {
/// This is known as the resource in apimachinery, we rename it for disambiguation.
fn plural(dt: &Self::DynamicType) -> Cow<'_, str>;

/// Returns whether this type should use metadata-only API requests
///
/// When `true`, [`Api`] methods like `get`, `list`, `watch`, and `patch` will
/// automatically use metadata-optimized Accept headers (e.g.
/// `application/json;as=PartialObjectMetadata;g=meta.k8s.io;v=v1`), so the API
/// server only returns object metadata instead of the full object.
///
/// This is overridden to return `true` for [`PartialObjectMeta<K>`], making
/// `Api<PartialObjectMeta<K>>` automatically efficient without any caller changes.
///
/// ```
/// # use kube_core::{Resource, metadata::PartialObjectMeta};
/// # use k8s_openapi::api::core::v1::Pod;
/// // Regular resources use full object requests
/// assert!(!Pod::metadata_api());
///
/// // PartialObjectMeta wraps any resource for metadata-only requests
/// assert!(PartialObjectMeta::<Pod>::metadata_api());
/// ```
///
/// [`Api`]: https://docs.rs/kube/latest/kube/struct.Api.html
/// [`PartialObjectMeta<K>`]: crate::metadata::PartialObjectMeta
fn metadata_api() -> bool {
false
}

/// Creates a url path for http requests for this resource
fn url_path(dt: &Self::DynamicType, namespace: Option<&str>) -> String {
let n = if let Some(ns) = namespace {
Expand Down
17 changes: 11 additions & 6 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Runs a user-supplied reconciler function on objects when they (or related objects) are updated

use self::runner::Runner;
#[allow(deprecated)] use crate::watcher::metadata_watcher;
use crate::{
reflector::{
self, ObjectRef, reflector,
Expand All @@ -10,7 +11,7 @@ use crate::{
utils::{
Backoff, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt, trystream_try_via,
},
watcher::{self, DefaultBackoff, metadata_watcher, watcher},
watcher::{self, DefaultBackoff, watcher},
};
use educe::Educe;
use futures::{
Expand Down Expand Up @@ -1007,6 +1008,7 @@ where
Child::DynamicType: Debug + Eq + Hash + Clone,
{
// TODO: call owns_stream_with when it's stable
#[allow(deprecated)]
let child_watcher = trigger_owners(
metadata_watcher(api, wc).touched_objects(),
self.dyntype.clone(),
Expand Down Expand Up @@ -1712,11 +1714,14 @@ mod tests {
use crate::{
Config, Controller, applier,
reflector::{self, ObjectRef},
watcher::{self, Event, metadata_watcher, watcher},
watcher::{self, Event, watcher},
};
use futures::{Stream, StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::ConfigMap;
use kube_client::{Api, Resource, core::ObjectMeta};
use kube_client::{
Api, Resource,
core::{ObjectMeta, PartialObjectMeta},
};
use serde::de::DeserializeOwned;
use tokio::time::timeout;

Expand Down Expand Up @@ -1755,13 +1760,13 @@ mod tests {
// not #[test] because we don't want to actually run it, we just want to
// assert that it typechecks
//
// will check return types for `watcher` and `watch_metadata` do not drift
// will check return types for `watcher` and `watcher with PartialObjectMeta` do not drift
// given an arbitrary K that implements `Resource` (e.g ConfigMap)
#[allow(dead_code, unused_must_use)]
fn test_watcher_stream_type_drift() {
assert_stream(watcher(mock_type::<Api<ConfigMap>>(), Default::default()));
assert_stream(metadata_watcher(
mock_type::<Api<ConfigMap>>(),
assert_stream(watcher(
mock_type::<Api<PartialObjectMeta<ConfigMap>>>(),
Default::default(),
));
}
Expand Down
Loading
Loading