Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions kube-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ categories = ["web-programming::http-client", "network-programming", "api-bindin

[features]
default = ["client", "ring"]
rustls-tls = ["rustls", "hyper-rustls"]
rustls-tls = ["rustls", "hyper-rustls", "tokio-rustls"]
webpki-roots = ["hyper-rustls/webpki-roots"]
aws-lc-rs = ["hyper-rustls?/aws-lc-rs"]
ring = ["hyper-rustls?/ring"]
Expand Down Expand Up @@ -58,14 +58,15 @@ jiff = { workspace = true, optional = true, features = ["std", "serde"] }
pem = { workspace = true, optional = true }
openssl = { workspace = true, optional = true }
rustls = { workspace = true, optional = true }
tokio-rustls = { version = "0.26", default-features = false, optional = true }
bytes = { workspace = true, optional = true }
tokio = { workspace = true, features = ["time", "signal", "sync", "rt"], optional = true }
kube-core = { path = "../kube-core", version = "=3.1.0" }
jsonpath-rust = { workspace = true, optional = true }
tokio-util = { workspace = true, features = ["io", "codec"], optional = true }
hyper = { workspace = true, features = ["client", "http1"], optional = true }
hyper-util = { workspace = true, features = ["client", "client-legacy", "http1", "tokio"], optional = true }
hyper-rustls = { workspace = true, features = ["http1", "logging", "native-tokio", "tls12"], optional = true }
hyper = { workspace = true, features = ["client", "http1", "http2"], optional = true }
hyper-util = { workspace = true, features = ["client", "client-legacy", "http1", "http2", "tokio"], optional = true }
hyper-rustls = { workspace = true, features = ["http1", "http2", "logging", "native-tokio", "tls12"], optional = true }
tokio-tungstenite = { workspace = true, optional = true }
tower = { workspace = true, features = ["buffer", "filter", "util", "retry"], optional = true }
tower-http = { workspace = true, features = ["auth", "map-response-body", "trace", "util"], optional = true }
Expand Down
229 changes: 192 additions & 37 deletions kube-client/src/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use hyper_timeout::TimeoutConnector;

use hyper_util::{
client::legacy::connect::{Connection, HttpConnector},
rt::TokioExecutor,
rt::{TokioExecutor, TokioTimer},
};

use jiff::Timestamp;
Expand All @@ -28,6 +28,7 @@ pub type DynBody = dyn http_body::Body<Data = Bytes, Error = BoxError> + Send +
/// Builder for [`Client`] instances with customized [tower](`Service`) middleware.
pub struct ClientBuilder<Svc> {
service: Svc,
upgrade_service: Option<GenericService>,
default_ns: String,
valid_until: Option<Timestamp>,
}
Expand All @@ -43,20 +44,29 @@ impl<Svc> ClientBuilder<Svc> {
{
Self {
service,
upgrade_service: None,
default_ns: default_namespace.into(),
valid_until: None,
}
}

/// Add a [`Layer`] to the current [`Service`] stack.
///
/// The layer is applied to the primary [`Service`] only. If an upgrade
/// service has been set via [`with_upgrade_service`](Self::with_upgrade_service)
/// it is left untouched; users wanting to layer both must apply the
/// layer to each service themselves before calling
/// [`with_upgrade_service`].
pub fn with_layer<L: Layer<Svc>>(self, layer: &L) -> ClientBuilder<L::Service> {
let Self {
service: stack,
upgrade_service,
default_ns,
valid_until,
} = self;
ClientBuilder {
service: layer.layer(stack),
upgrade_service,
default_ns,
valid_until,
}
Expand All @@ -66,11 +76,33 @@ impl<Svc> ClientBuilder<Svc> {
pub fn with_valid_until(self, valid_until: Option<Timestamp>) -> Self {
ClientBuilder {
service: self.service,
upgrade_service: self.upgrade_service,
default_ns: self.default_ns,
valid_until,
}
}

/// Provide a separate [`Service`] used by the upgrade transport that
/// backs exec, attach, and port-forward.
///
/// The supplied service is the same shape as [`GenericService`], the
/// boxed service produced by the default builder stack. Custom-service
/// users that do not naturally arrive at this shape can instead call
/// [`Client::new_with_upgrade`] directly.
///
/// This is required only if the primary service may negotiate HTTP/2
/// *and* the application also uses upgrade subresources. HTTP/1.1
/// upgrades are unrepresentable on an HTTP/2 connection, so the
/// upgrade transport must offer only HTTP/1.1.
pub fn with_upgrade_service(self, upgrade_service: GenericService) -> Self {
ClientBuilder {
service: self.service,
upgrade_service: Some(upgrade_service),
default_ns: self.default_ns,
valid_until: self.valid_until,
}
}

/// Build a [`Client`] instance with the current [`Service`] stack.
pub fn build<B>(self) -> Client
where
Expand All @@ -80,7 +112,17 @@ impl<Svc> ClientBuilder<Svc> {
B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
B::Error: Into<BoxError>,
{
Client::new(self.service, self.default_ns).with_valid_until(self.valid_until)
let Self {
service,
upgrade_service,
default_ns,
valid_until,
} = self;
match upgrade_service {
Some(upgrade) => Client::new_with_upgrade(service, upgrade, default_ns),
None => Client::new(service, default_ns),
}
.with_valid_until(valid_until)
}
}

Expand Down Expand Up @@ -132,10 +174,13 @@ impl TryFrom<Config> for ClientBuilder<GenericService> {
use base64::Engine;
use http::HeaderValue;

let value = format!("Basic {}", base64::engine::general_purpose::STANDARD.encode(userinfo));
let value = format!(
"Basic {}",
base64::engine::general_purpose::STANDARD.encode(userinfo)
);
let header = HeaderValue::from_str(&value).unwrap();
connector = connector.with_auth(header);
}
}
}

make_generic_builder(connector, config)
Expand Down Expand Up @@ -167,34 +212,152 @@ where
H::Error: 'static + Send + Sync + std::error::Error,
{
let default_ns = config.default_namespace.clone();

// Build two hyper clients with separate connection pools and ALPN policies:
// - the primary, h2-capable transport for normal REST/watch/log traffic
// (subject to `Config::disable_http2`)
// - an HTTP/1.1-only transport for the upgrade path used by exec, attach,
// and port-forward, regardless of `disable_http2`.
//
// Current TLS feature precedence when more than one is set:
// 1. rustls-tls
// 2. openssl-tls
// If neither TLS feature is enabled, the http connector is used; only the
// http scheme is supported in that case.
// Compute auth and extra-headers layers once and share across both
// transports. Calling `auth_layer()` twice would mint independent
// `RefreshableToken` state per transport, so each path would refresh
// tokens on its own and they'd diverge under exec-plugin or token-file
// auth.
Comment on lines +228 to +231
Copy link
Copy Markdown
Member

@clux clux May 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really appreciate these detailed comments here.

helps the complexity go down a little more easily.

let auth_layer = config.auth_layer()?;
let extra_headers_layer = config.extra_headers_layer()?;

// The two transports use connectors with different concrete types after
// TLS wrapping (h2-capable vs explicit-h1 ALPN), so each path is built
// and wrapped independently and erased to a `GenericService` here rather
// than threading the connector type through.
let main_service = build_main_service(
connector.clone(),
&config,
auth_layer.clone(),
extra_headers_layer.clone(),
)?;
let upgrade_service = build_upgrade_service(connector, &config, auth_layer, extra_headers_layer)?;

let (_, expiration) = config.exec_identity_pem();

let client = ClientBuilder::new(main_service, default_ns)
.with_upgrade_service(upgrade_service)
.with_valid_until(expiration);

Ok(client)
}

/// Build the primary, h2-capable transport service.
///
/// Uses the dual-protocol TLS connector (rustls advertises `h2,http/1.1`
/// in ALPN; openssl currently advertises nothing -- parity work is a
/// follow-up). The hyper client carries `TokioTimer` and HTTP/2
/// keep-alive PINGs so watch streams survive idle-killing intermediaries
/// such as HAProxy.
///
/// If `Config::disable_http2` is set, falls back to building a structurally
/// identical service via [`build_upgrade_service`] -- both clients then
/// carry HTTP/1.1-only connectors but the two-client shape stays the same.
fn build_main_service<H>(
connector: H,
config: &Config,
auth_layer: Option<crate::client::middleware::AuthLayer>,
extra_headers_layer: crate::client::middleware::ExtraHeadersLayer,
) -> Result<GenericService, Error>
where
H: 'static + Clone + Send + Sync + Service<http::Uri>,
H::Response: 'static + Connection + Read + Write + Send + Unpin,
H::Future: 'static + Send,
H::Error: 'static + Send + Sync + std::error::Error,
{
if config.disable_http2 {
return build_upgrade_service(connector, config, auth_layer, extra_headers_layer);
}

let client: hyper_util::client::legacy::Client<_, Body> = {
// Current TLS feature precedence when more than one are set:
// 1. rustls-tls
// 2. openssl-tls
// Create a custom client to use something else.
// If TLS features are not enabled, http connector will be used.
#[cfg(feature = "rustls-tls")]
let connector = config.rustls_https_connector_with_connector(connector)?;
#[cfg(all(not(feature = "rustls-tls"), feature = "openssl-tls"))]
let connector = config.openssl_https_connector_with_connector(connector)?;
#[cfg(all(not(feature = "rustls-tls"), not(feature = "openssl-tls")))]
#[cfg(feature = "rustls-tls")]
let connector = config.rustls_https_connector_with_connector(connector)?;
#[cfg(all(not(feature = "rustls-tls"), feature = "openssl-tls"))]
let connector = config.openssl_https_connector_with_connector(connector)?;
#[cfg(all(not(feature = "rustls-tls"), not(feature = "openssl-tls")))]
{
if config.cluster_url.scheme() == Some(&http::uri::Scheme::HTTPS) {
// no tls stack situation only works with http scheme
return Err(Error::TlsRequired);
}
}

let mut connector = TimeoutConnector::new(connector);
let mut connector = TimeoutConnector::new(connector);
connector.set_connect_timeout(config.connect_timeout);
connector.set_read_timeout(config.read_timeout);
connector.set_write_timeout(config.write_timeout);

let mut builder = hyper_util::client::legacy::Builder::new(TokioExecutor::new());
builder
.timer(TokioTimer::new())
.http2_keep_alive_interval(Duration::from_secs(30))
.http2_keep_alive_while_idle(true);
let client = builder.build(connector);
wrap_with_layers(client, config, auth_layer, extra_headers_layer)
}

/// Build the HTTP/1.1-only upgrade transport service.
///
/// Used by exec, attach, and port-forward; HTTP/1.1 upgrades are
/// unrepresentable on an HTTP/2 connection. The connector explicitly
/// advertises `http/1.1` in ALPN (rustls) so the server cannot pick
/// HTTP/2 at the TLS handshake.
fn build_upgrade_service<H>(
connector: H,
config: &Config,
auth_layer: Option<crate::client::middleware::AuthLayer>,
extra_headers_layer: crate::client::middleware::ExtraHeadersLayer,
) -> Result<GenericService, Error>
where
H: 'static + Clone + Send + Sync + Service<http::Uri>,
H::Response: 'static + Connection + Read + Write + Send + Unpin,
H::Future: 'static + Send,
H::Error: 'static + Send + Sync + std::error::Error,
{
#[cfg(feature = "rustls-tls")]
let connector = config.rustls_https_connector_http1_only_with_connector(connector)?;
#[cfg(all(not(feature = "rustls-tls"), feature = "openssl-tls"))]
let connector = config.openssl_https_connector_with_connector(connector)?;
#[cfg(all(not(feature = "rustls-tls"), not(feature = "openssl-tls")))]
{
if config.cluster_url.scheme() == Some(&http::uri::Scheme::HTTPS) {
return Err(Error::TlsRequired);
}
}

// Set the timeouts for the client
connector.set_connect_timeout(config.connect_timeout);
connector.set_read_timeout(config.read_timeout);
connector.set_write_timeout(config.write_timeout);
let mut connector = TimeoutConnector::new(connector);
connector.set_connect_timeout(config.connect_timeout);
connector.set_read_timeout(config.read_timeout);
connector.set_write_timeout(config.write_timeout);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The write timeout here is default 295s which is a number very specific to long watches. I am not sure it makes sense for websocket calls. But I don't have a good recommendation either yet.


hyper_util::client::legacy::Builder::new(TokioExecutor::new()).build(connector)
};
let builder = hyper_util::client::legacy::Builder::new(TokioExecutor::new());
let client = builder.build(connector);
wrap_with_layers(client, config, auth_layer, extra_headers_layer)
}

/// Wrap a hyper client with the standard tower layer stack (base URI, gzip,
/// auth, extra headers, tracing) and erase to a [`GenericService`].
fn wrap_with_layers<C>(
client: hyper_util::client::legacy::Client<C, Body>,
config: &Config,
auth_layer: Option<crate::client::middleware::AuthLayer>,
extra_headers_layer: crate::client::middleware::ExtraHeadersLayer,
) -> Result<GenericService, Error>
where
C: 'static + Clone + Send + Sync + Service<http::Uri>,
C::Response: 'static + Connection + Read + Write + Send + Unpin,
C::Future: 'static + Send + Unpin,
C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
let stack = ServiceBuilder::new().layer(config.base_uri_layer()).into_inner();
#[cfg(feature = "gzip")]
let stack = ServiceBuilder::new()
Expand All @@ -211,7 +374,7 @@ where
let service = ServiceBuilder::new()
.layer(stack)
.option_layer(auth_layer)
.layer(config.extra_headers_layer()?)
.layer(extra_headers_layer)
.layer(
// Attribute names follow [Semantic Conventions].
// [Semantic Conventions]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md
Expand Down Expand Up @@ -263,19 +426,11 @@ where
.map_err(BoxError::from)
.service(client);

let (_, expiration) = config.exec_identity_pem();

let client = ClientBuilder::new(
service
.map_response_body(|body| {
Box::new(http_body_util::BodyExt::map_err(body, BoxError::from)) as Box<DynBody>
})
.boxed(),
default_ns,
)
.with_valid_until(expiration);

Ok(client)
Ok(service
.map_response_body(|body| {
Box::new(http_body_util::BodyExt::map_err(body, BoxError::from)) as Box<DynBody>
})
.boxed())
}

#[cfg(test)]
Expand Down
Loading
Loading