Skip to content

Commit 0f0cb93

Browse files
doxxx93clux
andauthored
Api<PartialObjectMeta<K>> should opportunistically degrade to metadata requests (#1952)
* feat(kube-core): add metadata_api() to Resource trait Add `fn metadata_api() -> bool` to the `Resource` trait (default false), overridden to return true for `PartialObjectMeta<K>`. This allows downstream code to detect metadata-only types at compile time and automatically switch to efficient metadata-optimized API requests. Ref: #1614 Signed-off-by: doxxx93 <doxxx93@gmail.com> * feat(kube-client): auto-use metadata headers for PartialObjectMeta Api methods get, list, watch, and patch now branch on Resource::metadata_api() to automatically use metadata-optimized Accept headers when K = PartialObjectMeta<_>, so the API server returns only metadata instead of the full object. Ref: #1614 Signed-off-by: doxxx93 <doxxx93@gmail.com> * refactor(kube-runtime): deprecate metadata_watcher With Api<PartialObjectMeta<K>> now automatically using metadata-only requests, metadata_watcher is no longer needed. Users can use watcher(Api::<PartialObjectMeta<K>>::all(client), config) instead. Simplify the dynamic_watcher example to remove the runtime branching and focus on dynamic resource watching. Ref: #1614 Signed-off-by: doxxx93 <doxxx93@gmail.com> * test(kube-client): verify PartialObjectMeta uses metadata headers Add mock-based tests verifying that Api<PartialObjectMeta<Pod>> sends the correct metadata-only Accept headers for get, list, watch, and patch operations. Ref: #1614 Signed-off-by: doxxx93 <doxxx93@gmail.com> * refactor: cache metadata_api on Api to avoid tightening method bounds PR review feedback (clux): the original PR added `K: Resource` to `impl<K> Api<K>` in core_methods.rs (so it could call `K::metadata_api()` per request), which forced the same bound onto `ApiMode for FullObject<K>` and `ApiMode for MetaOnly<K>` in watcher.rs. This goes against the spirit of #1393 by tightening method-level bounds beyond what the corresponding Api/watcher constructors already require. Cache the flag on `Api<K>` itself in the existing `impl<K: Resource> Api<K>` constructors, then read `self.metadata_api` from the per-method code paths. The Resource bound stays exactly where it was before this PR (Api constructors and the public `watcher`/`metadata_watcher` signatures), no user-visible API change. See #1952 (comment) Signed-off-by: doxxx93 <doxxx93@gmail.com> --------- Signed-off-by: doxxx93 <doxxx93@gmail.com> Co-authored-by: Eirik A <sszynrae@gmail.com>
1 parent 67e4671 commit 0f0cb93

9 files changed

Lines changed: 208 additions & 37 deletions

File tree

examples/dynamic_watcher.rs

Lines changed: 9 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,17 @@
1-
use futures::{Stream, StreamExt, TryStreamExt};
1+
use futures::{StreamExt, TryStreamExt};
22
use kube::{
3-
api::{Api, ApiResource, DynamicObject, GroupVersionKind, Resource, ResourceExt},
4-
runtime::{WatchStreamExt, metadata_watcher, watcher, watcher::Event},
3+
api::{Api, DynamicObject, GroupVersionKind, ResourceExt},
4+
runtime::{WatchStreamExt, watcher},
55
};
6-
use serde::de::DeserializeOwned;
76
use tracing::*;
87

9-
use std::{env, fmt::Debug};
8+
use std::env;
109

1110
#[tokio::main]
1211
async fn main() -> anyhow::Result<()> {
1312
tracing_subscriber::fmt::init();
1413
let client = kube::Client::try_default().await?;
1514

16-
// If set will receive only the metadata for watched resources
17-
let watch_metadata = env::var("WATCH_METADATA").map(|s| s == "1").unwrap_or(false);
18-
1915
// Take dynamic resource identifiers:
2016
let group = env::var("GROUP").unwrap_or_else(|_| "".into());
2117
let version = env::var("VERSION").unwrap_or_else(|_| "v1".into());
@@ -28,28 +24,15 @@ async fn main() -> anyhow::Result<()> {
2824

2925
// Use the full resource info to create an Api with the ApiResource as its DynamicType
3026
let api = Api::<DynamicObject>::all_with(client, &ar);
31-
let wc = watcher::Config::default();
32-
33-
// Start a metadata or a full resource watch
34-
if watch_metadata {
35-
handle_events(metadata_watcher(api, wc), &ar).await
36-
} else {
37-
handle_events(watcher(api, wc), &ar).await
38-
}
39-
}
4027

41-
async fn handle_events<
42-
K: Resource<DynamicType = ApiResource> + Clone + Debug + Send + DeserializeOwned + 'static,
43-
>(
44-
stream: impl Stream<Item = watcher::Result<Event<K>>> + Send + 'static,
45-
ar: &ApiResource,
46-
) -> anyhow::Result<()> {
47-
let mut items = stream.applied_objects().boxed();
28+
// For metadata-only watching, use Api::<PartialObjectMeta<DynamicObject>> instead.
29+
// PartialObjectMeta-based Api automatically uses efficient metadata-only requests.
30+
let mut items = watcher(api, watcher::Config::default()).applied_objects().boxed();
4831
while let Some(p) = items.try_next().await? {
4932
if let Some(ns) = p.namespace() {
50-
info!("saw {} {} in {ns}", K::kind(ar), p.name_any());
33+
info!("saw {kind} {} in {ns}", p.name_any());
5134
} else {
52-
info!("saw {} {}", K::kind(ar), p.name_any());
35+
info!("saw {kind} {}", p.name_any());
5336
}
5437
trace!("full obj: {p:?}");
5538
}

kube-client/src/api/core_methods.rs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,12 @@ where
8080
/// This function assumes that the object is expected to always exist, and returns [`Error`] if it does not.
8181
/// Consider using [`Api::get_opt`] if you need to handle missing objects.
8282
pub async fn get_with(&self, name: &str, gp: &GetParams) -> Result<K> {
83-
let mut req = self.request.get(name, gp).map_err(Error::BuildRequest)?;
83+
let mut req = if self.metadata_api {
84+
self.request.get_metadata(name, gp)
85+
} else {
86+
self.request.get(name, gp)
87+
}
88+
.map_err(Error::BuildRequest)?;
8489
req.extensions_mut().insert("get");
8590
self.client.request::<K>(req).await
8691
}
@@ -215,7 +220,12 @@ where
215220
/// # }
216221
/// ```
217222
pub async fn list(&self, lp: &ListParams) -> Result<ObjectList<K>> {
218-
let mut req = self.request.list(lp).map_err(Error::BuildRequest)?;
223+
let mut req = if self.metadata_api {
224+
self.request.list_metadata(lp)
225+
} else {
226+
self.request.list(lp)
227+
}
228+
.map_err(Error::BuildRequest)?;
219229
req.extensions_mut().insert("list");
220230
self.client.request::<ObjectList<K>>(req).await
221231
}
@@ -381,7 +391,12 @@ where
381391
pp: &PatchParams,
382392
patch: &Patch<P>,
383393
) -> Result<K> {
384-
let mut req = self.request.patch(name, pp, patch).map_err(Error::BuildRequest)?;
394+
let mut req = if self.metadata_api {
395+
self.request.patch_metadata(name, pp, patch)
396+
} else {
397+
self.request.patch(name, pp, patch)
398+
}
399+
.map_err(Error::BuildRequest)?;
385400
req.extensions_mut().insert("patch");
386401
self.client.request::<K>(req).await
387402
}
@@ -538,7 +553,12 @@ where
538553
wp: &WatchParams,
539554
version: &str,
540555
) -> Result<impl Stream<Item = Result<WatchEvent<K>>> + use<K>> {
541-
let mut req = self.request.watch(wp, version).map_err(Error::BuildRequest)?;
556+
let mut req = if self.metadata_api {
557+
self.request.watch_metadata(wp, version)
558+
} else {
559+
self.request.watch(wp, version)
560+
}
561+
.map_err(Error::BuildRequest)?;
542562
req.extensions_mut().insert("watch");
543563
self.client.request_events::<K>(req).await
544564
}

kube-client/src/api/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ pub struct Api<K> {
5454
/// The client to use (from this library)
5555
pub(crate) client: Client,
5656
namespace: Option<String>,
57+
/// Whether requests should use metadata-only Accept headers
58+
/// (cached from `K::metadata_api()` at construction so that `impl<K> Api<K>`
59+
/// method blocks don't have to tighten to `K: Resource`).
60+
pub(crate) metadata_api: bool,
5761
/// Note: Using `iter::Empty` over `PhantomData`, because we never actually keep any
5862
/// `K` objects, so `Empty` better models our constraints (in particular, `Empty<K>`
5963
/// is `Send`, even if `K` may not be).
@@ -83,6 +87,7 @@ impl<K: Resource> Api<K> {
8387
client,
8488
request: Request::new(url),
8589
namespace: None,
90+
metadata_api: K::metadata_api(),
8691
_phantom: std::iter::empty(),
8792
}
8893
}
@@ -100,6 +105,7 @@ impl<K: Resource> Api<K> {
100105
client,
101106
request: Request::new(url),
102107
namespace: Some(ns.to_string()),
108+
metadata_api: K::metadata_api(),
103109
_phantom: std::iter::empty(),
104110
}
105111
}
@@ -194,6 +200,7 @@ where
194200
client,
195201
request: Request::new(url),
196202
namespace: Some(ns.to_string()),
203+
metadata_api: K::metadata_api(),
197204
_phantom: std::iter::empty(),
198205
}
199206
}
@@ -241,6 +248,7 @@ impl<K> Debug for Api<K> {
241248
request,
242249
client: _,
243250
namespace,
251+
metadata_api: _,
244252
_phantom,
245253
} = self;
246254
f.debug_struct("Api")

kube-client/src/client/mod.rs

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,7 @@ mod tests {
594594

595595
use http::{Request, Response};
596596
use k8s_openapi::api::core::v1::Pod;
597+
use kube_core::metadata::PartialObjectMeta;
597598
use tower_test::mock;
598599

599600
#[tokio::test]
@@ -665,4 +666,115 @@ mod tests {
665666
assert_eq!(pod.metadata.annotations.unwrap().get("kube-rs").unwrap(), "test");
666667
spawned.await.unwrap();
667668
}
669+
670+
#[tokio::test]
671+
async fn test_partial_object_meta_get_uses_metadata_header() {
672+
let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
673+
let spawned = tokio::spawn(async move {
674+
let mut handle = pin!(handle);
675+
let (request, send) = handle.next_request().await.expect("service not called");
676+
// Verify the metadata-only Accept header is set
677+
assert_eq!(
678+
request.headers().get(http::header::ACCEPT).unwrap(),
679+
"application/json;as=PartialObjectMetadata;g=meta.k8s.io;v=v1"
680+
);
681+
let pod_meta: PartialObjectMeta<Pod> =
682+
serde_json::from_value(serde_json::json!({
683+
"apiVersion": "meta.k8s.io/v1",
684+
"kind": "PartialObjectMetadata",
685+
"metadata": {
686+
"name": "test",
687+
},
688+
}))
689+
.unwrap();
690+
send.send_response(Response::new(Body::from(
691+
serde_json::to_vec(&pod_meta).unwrap(),
692+
)));
693+
});
694+
695+
let pods: Api<PartialObjectMeta<Pod>> =
696+
Api::default_namespaced(Client::new(mock_service, "default"));
697+
let pod = pods.get("test").await.unwrap();
698+
assert_eq!(pod.metadata.name, Some("test".to_string()));
699+
spawned.await.unwrap();
700+
}
701+
702+
#[tokio::test]
703+
async fn test_partial_object_meta_list_uses_metadata_header() {
704+
let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
705+
let spawned = tokio::spawn(async move {
706+
let mut handle = pin!(handle);
707+
let (request, send) = handle.next_request().await.expect("service not called");
708+
assert_eq!(
709+
request.headers().get(http::header::ACCEPT).unwrap(),
710+
"application/json;as=PartialObjectMetadataList;g=meta.k8s.io;v=v1"
711+
);
712+
send.send_response(Response::new(Body::from(
713+
serde_json::to_vec(&serde_json::json!({
714+
"apiVersion": "meta.k8s.io/v1",
715+
"kind": "PartialObjectMetadataList",
716+
"metadata": { "resourceVersion": "1" },
717+
"items": [],
718+
}))
719+
.unwrap(),
720+
)));
721+
});
722+
723+
let pods: Api<PartialObjectMeta<Pod>> =
724+
Api::default_namespaced(Client::new(mock_service, "default"));
725+
let list = pods.list(&Default::default()).await.unwrap();
726+
assert_eq!(list.items.len(), 0);
727+
spawned.await.unwrap();
728+
}
729+
730+
#[tokio::test]
731+
async fn test_partial_object_meta_patch_uses_metadata_header() {
732+
use kube_core::params::{Patch, PatchParams};
733+
734+
let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
735+
let spawned = tokio::spawn(async move {
736+
let mut handle = pin!(handle);
737+
let (request, send) = handle.next_request().await.expect("service not called");
738+
assert_eq!(request.method(), http::Method::PATCH);
739+
assert_eq!(
740+
request.headers().get(http::header::ACCEPT).unwrap(),
741+
"application/json;as=PartialObjectMetadata;g=meta.k8s.io;v=v1"
742+
);
743+
send.send_response(Response::new(Body::from(
744+
serde_json::to_vec(&serde_json::json!({
745+
"apiVersion": "meta.k8s.io/v1",
746+
"kind": "PartialObjectMetadata",
747+
"metadata": { "name": "test" },
748+
}))
749+
.unwrap(),
750+
)));
751+
});
752+
753+
let pods: Api<PartialObjectMeta<Pod>> =
754+
Api::default_namespaced(Client::new(mock_service, "default"));
755+
let patch = Patch::Merge(serde_json::json!({ "metadata": { "labels": { "a": "b" } } }));
756+
let pod = pods.patch("test", &PatchParams::default(), &patch).await.unwrap();
757+
assert_eq!(pod.metadata.name, Some("test".to_string()));
758+
spawned.await.unwrap();
759+
}
760+
761+
#[tokio::test]
762+
async fn test_partial_object_meta_watch_uses_metadata_header() {
763+
let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
764+
let spawned = tokio::spawn(async move {
765+
let mut handle = pin!(handle);
766+
let (request, send) = handle.next_request().await.expect("service not called");
767+
assert_eq!(
768+
request.headers().get(http::header::ACCEPT).unwrap(),
769+
"application/json;as=PartialObjectMetadata;g=meta.k8s.io;v=v1"
770+
);
771+
// Send an empty response to close the stream
772+
send.send_response(Response::new(Body::from(vec![])));
773+
});
774+
775+
let pods: Api<PartialObjectMeta<Pod>> =
776+
Api::default_namespaced(Client::new(mock_service, "default"));
777+
let _ = pods.watch(&Default::default(), "0").await;
778+
spawned.await.unwrap();
779+
}
668780
}

kube-core/src/metadata.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,10 @@ impl<K: Resource> Resource for PartialObjectMeta<K> {
150150
type DynamicType = K::DynamicType;
151151
type Scope = K::Scope;
152152

153+
fn metadata_api() -> bool {
154+
true
155+
}
156+
153157
fn kind(dt: &Self::DynamicType) -> Cow<'_, str> {
154158
K::kind(dt)
155159
}
@@ -207,4 +211,12 @@ mod test {
207211
assert_eq!(response_pom.types.as_ref().unwrap().api_version, "meta.k8s.io/v1");
208212
assert_eq!(response_pom.types.as_ref().unwrap().kind, "PartialObjectMetadata");
209213
}
214+
215+
#[test]
216+
fn metadata_api_flag() {
217+
// PartialObjectMeta should signal metadata-only API requests
218+
assert!(PartialObjectMeta::<Pod>::metadata_api());
219+
// Regular resources should not
220+
assert!(!Pod::metadata_api());
221+
}
210222
}

kube-core/src/resource.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,32 @@ pub trait Resource {
5454
/// This is known as the resource in apimachinery, we rename it for disambiguation.
5555
fn plural(dt: &Self::DynamicType) -> Cow<'_, str>;
5656

57+
/// Returns whether this type should use metadata-only API requests
58+
///
59+
/// When `true`, [`Api`] methods like `get`, `list`, `watch`, and `patch` will
60+
/// automatically use metadata-optimized Accept headers (e.g.
61+
/// `application/json;as=PartialObjectMetadata;g=meta.k8s.io;v=v1`), so the API
62+
/// server only returns object metadata instead of the full object.
63+
///
64+
/// This is overridden to return `true` for [`PartialObjectMeta<K>`], making
65+
/// `Api<PartialObjectMeta<K>>` automatically efficient without any caller changes.
66+
///
67+
/// ```
68+
/// # use kube_core::{Resource, metadata::PartialObjectMeta};
69+
/// # use k8s_openapi::api::core::v1::Pod;
70+
/// // Regular resources use full object requests
71+
/// assert!(!Pod::metadata_api());
72+
///
73+
/// // PartialObjectMeta wraps any resource for metadata-only requests
74+
/// assert!(PartialObjectMeta::<Pod>::metadata_api());
75+
/// ```
76+
///
77+
/// [`Api`]: https://docs.rs/kube/latest/kube/struct.Api.html
78+
/// [`PartialObjectMeta<K>`]: crate::metadata::PartialObjectMeta
79+
fn metadata_api() -> bool {
80+
false
81+
}
82+
5783
/// Creates a url path for http requests for this resource
5884
fn url_path(dt: &Self::DynamicType, namespace: Option<&str>) -> String {
5985
let n = if let Some(ns) = namespace {

kube-runtime/src/controller/mod.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Runs a user-supplied reconciler function on objects when they (or related objects) are updated
22
33
use self::runner::Runner;
4+
#[allow(deprecated)] use crate::watcher::metadata_watcher;
45
use crate::{
56
reflector::{
67
self, ObjectRef, reflector,
@@ -10,7 +11,7 @@ use crate::{
1011
utils::{
1112
Backoff, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt, trystream_try_via,
1213
},
13-
watcher::{self, DefaultBackoff, metadata_watcher, watcher},
14+
watcher::{self, DefaultBackoff, watcher},
1415
};
1516
use educe::Educe;
1617
use futures::{
@@ -1007,6 +1008,7 @@ where
10071008
Child::DynamicType: Debug + Eq + Hash + Clone,
10081009
{
10091010
// TODO: call owns_stream_with when it's stable
1011+
#[allow(deprecated)]
10101012
let child_watcher = trigger_owners(
10111013
metadata_watcher(api, wc).touched_objects(),
10121014
self.dyntype.clone(),
@@ -1712,11 +1714,14 @@ mod tests {
17121714
use crate::{
17131715
Config, Controller, applier,
17141716
reflector::{self, ObjectRef},
1715-
watcher::{self, Event, metadata_watcher, watcher},
1717+
watcher::{self, Event, watcher},
17161718
};
17171719
use futures::{Stream, StreamExt, TryStreamExt};
17181720
use k8s_openapi::api::core::v1::ConfigMap;
1719-
use kube_client::{Api, Resource, core::ObjectMeta};
1721+
use kube_client::{
1722+
Api, Resource,
1723+
core::{ObjectMeta, PartialObjectMeta},
1724+
};
17201725
use serde::de::DeserializeOwned;
17211726
use tokio::time::timeout;
17221727

@@ -1755,13 +1760,13 @@ mod tests {
17551760
// not #[test] because we don't want to actually run it, we just want to
17561761
// assert that it typechecks
17571762
//
1758-
// will check return types for `watcher` and `watch_metadata` do not drift
1763+
// will check return types for `watcher` and `watcher with PartialObjectMeta` do not drift
17591764
// given an arbitrary K that implements `Resource` (e.g ConfigMap)
17601765
#[allow(dead_code, unused_must_use)]
17611766
fn test_watcher_stream_type_drift() {
17621767
assert_stream(watcher(mock_type::<Api<ConfigMap>>(), Default::default()));
1763-
assert_stream(metadata_watcher(
1764-
mock_type::<Api<ConfigMap>>(),
1768+
assert_stream(watcher(
1769+
mock_type::<Api<PartialObjectMeta<ConfigMap>>>(),
17651770
Default::default(),
17661771
));
17671772
}

0 commit comments

Comments
 (0)