Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions kube-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ categories = ["web-programming::http-client", "network-programming", "api-bindin

[features]
default = ["client", "ring"]
rustls-tls = ["rustls", "hyper-rustls"]
rustls-tls = ["rustls", "hyper-rustls", "tokio-rustls"]
webpki-roots = ["hyper-rustls/webpki-roots"]
aws-lc-rs = ["hyper-rustls?/aws-lc-rs"]
ring = ["hyper-rustls?/ring"]
Expand Down Expand Up @@ -57,14 +57,15 @@ jiff = { workspace = true, optional = true, features = ["std", "serde"] }
pem = { workspace = true, optional = true }
openssl = { workspace = true, optional = true }
rustls = { workspace = true, optional = true }
tokio-rustls = { version = "0.26", default-features = false, optional = true }
bytes = { workspace = true, optional = true }
tokio = { workspace = true, features = ["time", "signal", "sync", "rt"], optional = true }
kube-core = { path = "../kube-core", version = "=3.1.0" }
jsonpath-rust = { workspace = true, optional = true }
tokio-util = { workspace = true, features = ["io", "codec"], optional = true }
hyper = { workspace = true, features = ["client", "http1"], optional = true }
hyper-util = { workspace = true, features = ["client", "client-legacy", "http1", "tokio", "tracing"], optional = true }
hyper-rustls = { workspace = true, features = ["http1", "logging", "native-tokio", "tls12"], optional = true }
hyper = { workspace = true, features = ["client", "http1", "http2"], optional = true }
hyper-util = { workspace = true, features = ["client", "client-legacy", "http1", "http2", "tokio", "tracing"], optional = true }
hyper-rustls = { workspace = true, features = ["http1", "http2", "logging", "native-tokio", "tls12"], optional = true }
tokio-tungstenite = { workspace = true, optional = true }
tower = { workspace = true, features = ["buffer", "filter", "util", "retry"], optional = true }
tower-http = { workspace = true, features = ["auth", "map-response-body", "trace", "util"], optional = true }
Expand Down
229 changes: 192 additions & 37 deletions kube-client/src/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use hyper_timeout::TimeoutConnector;

use hyper_util::{
client::legacy::connect::{Connection, HttpConnector},
rt::TokioExecutor,
rt::{TokioExecutor, TokioTimer},
};

use jiff::Timestamp;
Expand All @@ -28,6 +28,7 @@ pub type DynBody = dyn http_body::Body<Data = Bytes, Error = BoxError> + Send +
/// Builder for [`Client`] instances with customized [tower](`Service`) middleware.
pub struct ClientBuilder<Svc> {
service: Svc,
upgrade_service: Option<GenericService>,
default_ns: String,
valid_until: Option<Timestamp>,
}
Expand All @@ -43,20 +44,29 @@ impl<Svc> ClientBuilder<Svc> {
{
Self {
service,
upgrade_service: None,
default_ns: default_namespace.into(),
valid_until: None,
}
}

/// Add a [`Layer`] to the current [`Service`] stack.
///
/// The layer is applied to the primary [`Service`] only. If an upgrade
/// service has been set via [`with_upgrade_service`](Self::with_upgrade_service)
/// it is left untouched; users wanting to layer both must apply the
/// layer to each service themselves before calling
/// [`with_upgrade_service`].
pub fn with_layer<L: Layer<Svc>>(self, layer: &L) -> ClientBuilder<L::Service> {
let Self {
service: stack,
upgrade_service,
default_ns,
valid_until,
} = self;
ClientBuilder {
service: layer.layer(stack),
upgrade_service,
default_ns,
valid_until,
}
Expand All @@ -66,11 +76,33 @@ impl<Svc> ClientBuilder<Svc> {
pub fn with_valid_until(self, valid_until: Option<Timestamp>) -> Self {
ClientBuilder {
service: self.service,
upgrade_service: self.upgrade_service,
default_ns: self.default_ns,
valid_until,
}
}

/// Provide a separate [`Service`] used by the upgrade transport that
/// backs exec, attach, and port-forward.
///
/// The supplied service is the same shape as [`GenericService`], the
/// boxed service produced by the default builder stack. Custom-service
/// users that do not naturally arrive at this shape can instead call
/// [`Client::new_with_upgrade`] directly.
///
/// This is required only if the primary service may negotiate HTTP/2
/// *and* the application also uses upgrade subresources. HTTP/1.1
/// upgrades are unrepresentable on an HTTP/2 connection, so the
/// upgrade transport must offer only HTTP/1.1.
pub fn with_upgrade_service(self, upgrade_service: GenericService) -> Self {
ClientBuilder {
service: self.service,
upgrade_service: Some(upgrade_service),
default_ns: self.default_ns,
valid_until: self.valid_until,
}
}

/// Build a [`Client`] instance with the current [`Service`] stack.
pub fn build<B>(self) -> Client
where
Expand All @@ -80,7 +112,17 @@ impl<Svc> ClientBuilder<Svc> {
B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
B::Error: Into<BoxError>,
{
Client::new(self.service, self.default_ns).with_valid_until(self.valid_until)
let Self {
service,
upgrade_service,
default_ns,
valid_until,
} = self;
match upgrade_service {
Some(upgrade) => Client::new_with_upgrade(service, upgrade, default_ns),
None => Client::new(service, default_ns),
}
.with_valid_until(valid_until)
}
}

Expand Down Expand Up @@ -132,10 +174,13 @@ impl TryFrom<Config> for ClientBuilder<GenericService> {
use base64::Engine;
use http::HeaderValue;

let value = format!("Basic {}", base64::engine::general_purpose::STANDARD.encode(userinfo));
let value = format!(
"Basic {}",
base64::engine::general_purpose::STANDARD.encode(userinfo)
);
let header = HeaderValue::from_str(&value).unwrap();
connector = connector.with_auth(header);
}
}
}

make_generic_builder(connector, config)
Expand Down Expand Up @@ -167,34 +212,152 @@ where
H::Error: 'static + Send + Sync + std::error::Error,
{
let default_ns = config.default_namespace.clone();

// Build two hyper clients with separate connection pools and ALPN policies:
// - the primary, h2-capable transport for normal REST/watch/log traffic
// (subject to `Config::disable_http2`)
// - an HTTP/1.1-only transport for the upgrade path used by exec, attach,
// and port-forward, regardless of `disable_http2`.
//
// Current TLS feature precedence when more than one is set:
// 1. rustls-tls
// 2. openssl-tls
// If neither TLS feature is enabled, the http connector is used; only the
// http scheme is supported in that case.
// Compute auth and extra-headers layers once and share across both
// transports. Calling `auth_layer()` twice would mint independent
// `RefreshableToken` state per transport, so each path would refresh
// tokens on its own and they'd diverge under exec-plugin or token-file
// auth.
Comment on lines +228 to +231
Copy link
Copy Markdown
Member

@clux clux May 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really appreciate these detailed comments here.

helps the complexity go down a little more easily.

let auth_layer = config.auth_layer()?;
let extra_headers_layer = config.extra_headers_layer()?;

// The two transports use connectors with different concrete types after
// TLS wrapping (h2-capable vs explicit-h1 ALPN), so each path is built
// and wrapped independently and erased to a `GenericService` here rather
// than threading the connector type through.
let main_service = build_main_service(
connector.clone(),
&config,
auth_layer.clone(),
extra_headers_layer.clone(),
)?;
let upgrade_service = build_upgrade_service(connector, &config, auth_layer, extra_headers_layer)?;

let (_, expiration) = config.exec_identity_pem();

let client = ClientBuilder::new(main_service, default_ns)
.with_upgrade_service(upgrade_service)
.with_valid_until(expiration);

Ok(client)
}

/// Build the primary, h2-capable transport service.
///
/// Uses the dual-protocol TLS connector (rustls advertises `h2,http/1.1`
/// in ALPN; openssl currently advertises nothing -- parity work is a
/// follow-up). The hyper client carries `TokioTimer` and HTTP/2
/// keep-alive PINGs so watch streams survive idle-killing intermediaries
/// such as HAProxy.
///
/// If `Config::disable_http2` is set, falls back to building a structurally
/// identical service via [`build_upgrade_service`] -- both clients then
/// carry HTTP/1.1-only connectors but the two-client shape stays the same.
fn build_main_service<H>(
connector: H,
config: &Config,
auth_layer: Option<crate::client::middleware::AuthLayer>,
extra_headers_layer: crate::client::middleware::ExtraHeadersLayer,
) -> Result<GenericService, Error>
where
H: 'static + Clone + Send + Sync + Service<http::Uri>,
H::Response: 'static + Connection + Read + Write + Send + Unpin,
H::Future: 'static + Send,
H::Error: 'static + Send + Sync + std::error::Error,
{
if config.disable_http2 {
return build_upgrade_service(connector, config, auth_layer, extra_headers_layer);
}

let client: hyper_util::client::legacy::Client<_, Body> = {
// Current TLS feature precedence when more than one are set:
// 1. rustls-tls
// 2. openssl-tls
// Create a custom client to use something else.
// If TLS features are not enabled, http connector will be used.
#[cfg(feature = "rustls-tls")]
let connector = config.rustls_https_connector_with_connector(connector)?;
#[cfg(all(not(feature = "rustls-tls"), feature = "openssl-tls"))]
let connector = config.openssl_https_connector_with_connector(connector)?;
#[cfg(all(not(feature = "rustls-tls"), not(feature = "openssl-tls")))]
#[cfg(feature = "rustls-tls")]
let connector = config.rustls_https_connector_with_connector(connector)?;
#[cfg(all(not(feature = "rustls-tls"), feature = "openssl-tls"))]
let connector = config.openssl_https_connector_with_connector(connector)?;
#[cfg(all(not(feature = "rustls-tls"), not(feature = "openssl-tls")))]
{
if config.cluster_url.scheme() == Some(&http::uri::Scheme::HTTPS) {
// no tls stack situation only works with http scheme
return Err(Error::TlsRequired);
}
}

let mut connector = TimeoutConnector::new(connector);
let mut connector = TimeoutConnector::new(connector);
connector.set_connect_timeout(config.connect_timeout);
connector.set_read_timeout(config.read_timeout);
connector.set_write_timeout(config.write_timeout);

let mut builder = hyper_util::client::legacy::Builder::new(TokioExecutor::new());
builder
.timer(TokioTimer::new())
.http2_keep_alive_interval(Duration::from_secs(30))
.http2_keep_alive_while_idle(true);
let client = builder.build(connector);
wrap_with_layers(client, config, auth_layer, extra_headers_layer)
}

/// Build the HTTP/1.1-only upgrade transport service.
///
/// Used by exec, attach, and port-forward; HTTP/1.1 upgrades are
/// unrepresentable on an HTTP/2 connection. The connector explicitly
/// advertises `http/1.1` in ALPN (rustls) so the server cannot pick
/// HTTP/2 at the TLS handshake.
fn build_upgrade_service<H>(
connector: H,
config: &Config,
auth_layer: Option<crate::client::middleware::AuthLayer>,
extra_headers_layer: crate::client::middleware::ExtraHeadersLayer,
) -> Result<GenericService, Error>
where
H: 'static + Clone + Send + Sync + Service<http::Uri>,
H::Response: 'static + Connection + Read + Write + Send + Unpin,
H::Future: 'static + Send,
H::Error: 'static + Send + Sync + std::error::Error,
{
#[cfg(feature = "rustls-tls")]
let connector = config.rustls_https_connector_http1_only_with_connector(connector)?;
#[cfg(all(not(feature = "rustls-tls"), feature = "openssl-tls"))]
let connector = config.openssl_https_connector_with_connector(connector)?;
#[cfg(all(not(feature = "rustls-tls"), not(feature = "openssl-tls")))]
{
if config.cluster_url.scheme() == Some(&http::uri::Scheme::HTTPS) {
return Err(Error::TlsRequired);
}
}

// Set the timeouts for the client
connector.set_connect_timeout(config.connect_timeout);
connector.set_read_timeout(config.read_timeout);
connector.set_write_timeout(config.write_timeout);
let mut connector = TimeoutConnector::new(connector);
connector.set_connect_timeout(config.connect_timeout);
connector.set_read_timeout(config.read_timeout);
connector.set_write_timeout(config.write_timeout);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The write timeout here is default 295s which is a number very specific to long watches. I am not sure it makes sense for websocket calls. But I don't have a good recommendation either yet.


hyper_util::client::legacy::Builder::new(TokioExecutor::new()).build(connector)
};
let builder = hyper_util::client::legacy::Builder::new(TokioExecutor::new());
let client = builder.build(connector);
wrap_with_layers(client, config, auth_layer, extra_headers_layer)
}

/// Wrap a hyper client with the standard tower layer stack (base URI, gzip,
/// auth, extra headers, tracing) and erase to a [`GenericService`].
fn wrap_with_layers<C>(
client: hyper_util::client::legacy::Client<C, Body>,
config: &Config,
auth_layer: Option<crate::client::middleware::AuthLayer>,
extra_headers_layer: crate::client::middleware::ExtraHeadersLayer,
) -> Result<GenericService, Error>
where
C: 'static + Clone + Send + Sync + Service<http::Uri>,
C::Response: 'static + Connection + Read + Write + Send + Unpin,
C::Future: 'static + Send + Unpin,
C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
let stack = ServiceBuilder::new().layer(config.base_uri_layer()).into_inner();
#[cfg(feature = "gzip")]
let stack = ServiceBuilder::new()
Expand All @@ -211,7 +374,7 @@ where
let service = ServiceBuilder::new()
.layer(stack)
.option_layer(auth_layer)
.layer(config.extra_headers_layer()?)
.layer(extra_headers_layer)
.layer(
// Attribute names follow [Semantic Conventions].
// [Semantic Conventions]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md
Expand Down Expand Up @@ -263,19 +426,11 @@ where
.map_err(BoxError::from)
.service(client);

let (_, expiration) = config.exec_identity_pem();

let client = ClientBuilder::new(
service
.map_response_body(|body| {
Box::new(http_body_util::BodyExt::map_err(body, BoxError::from)) as Box<DynBody>
})
.boxed(),
default_ns,
)
.with_valid_until(expiration);

Ok(client)
Ok(service
.map_response_body(|body| {
Box::new(http_body_util::BodyExt::map_err(body, BoxError::from)) as Box<DynBody>
})
.boxed())
}

#[cfg(test)]
Expand Down
Loading
Loading