diff --git a/kube-client/Cargo.toml b/kube-client/Cargo.toml index e10433626..c5b922487 100644 --- a/kube-client/Cargo.toml +++ b/kube-client/Cargo.toml @@ -13,7 +13,7 @@ categories = ["web-programming::http-client", "network-programming", "api-bindin [features] default = ["client", "ring"] -rustls-tls = ["rustls", "hyper-rustls"] +rustls-tls = ["rustls", "hyper-rustls", "tokio-rustls"] webpki-roots = ["hyper-rustls/webpki-roots"] aws-lc-rs = ["hyper-rustls?/aws-lc-rs"] ring = ["hyper-rustls?/ring"] @@ -58,14 +58,15 @@ jiff = { workspace = true, optional = true, features = ["std", "serde"] } pem = { workspace = true, optional = true } openssl = { workspace = true, optional = true } rustls = { workspace = true, optional = true } +tokio-rustls = { version = "0.26", default-features = false, optional = true } bytes = { workspace = true, optional = true } tokio = { workspace = true, features = ["time", "signal", "sync", "rt"], optional = true } kube-core = { path = "../kube-core", version = "=3.1.0" } jsonpath-rust = { workspace = true, optional = true } tokio-util = { workspace = true, features = ["io", "codec"], optional = true } -hyper = { workspace = true, features = ["client", "http1"], optional = true } -hyper-util = { workspace = true, features = ["client", "client-legacy", "http1", "tokio"], optional = true } -hyper-rustls = { workspace = true, features = ["http1", "logging", "native-tokio", "tls12"], optional = true } +hyper = { workspace = true, features = ["client", "http1", "http2"], optional = true } +hyper-util = { workspace = true, features = ["client", "client-legacy", "http1", "http2", "tokio"], optional = true } +hyper-rustls = { workspace = true, features = ["http1", "http2", "logging", "native-tokio", "tls12"], optional = true } tokio-tungstenite = { workspace = true, optional = true } tower = { workspace = true, features = ["buffer", "filter", "util", "retry"], optional = true } tower-http = { workspace = true, features = ["auth", "map-response-body", "trace", "util"], optional = true } diff --git a/kube-client/src/client/builder.rs b/kube-client/src/client/builder.rs index eeb55cd59..2643a0737 100644 --- a/kube-client/src/client/builder.rs +++ b/kube-client/src/client/builder.rs @@ -8,7 +8,7 @@ use hyper_timeout::TimeoutConnector; use hyper_util::{ client::legacy::connect::{Connection, HttpConnector}, - rt::TokioExecutor, + rt::{TokioExecutor, TokioTimer}, }; use jiff::Timestamp; @@ -28,6 +28,7 @@ pub type DynBody = dyn http_body::Body + Send + /// Builder for [`Client`] instances with customized [tower](`Service`) middleware. pub struct ClientBuilder { service: Svc, + upgrade_service: Option, default_ns: String, valid_until: Option, } @@ -43,20 +44,29 @@ impl ClientBuilder { { Self { service, + upgrade_service: None, default_ns: default_namespace.into(), valid_until: None, } } /// Add a [`Layer`] to the current [`Service`] stack. + /// + /// The layer is applied to the primary [`Service`] only. If an upgrade + /// service has been set via [`with_upgrade_service`](Self::with_upgrade_service) + /// it is left untouched; users wanting to layer both must apply the + /// layer to each service themselves before calling + /// [`with_upgrade_service`]. pub fn with_layer>(self, layer: &L) -> ClientBuilder { let Self { service: stack, + upgrade_service, default_ns, valid_until, } = self; ClientBuilder { service: layer.layer(stack), + upgrade_service, default_ns, valid_until, } @@ -66,11 +76,33 @@ impl ClientBuilder { pub fn with_valid_until(self, valid_until: Option) -> Self { ClientBuilder { service: self.service, + upgrade_service: self.upgrade_service, default_ns: self.default_ns, valid_until, } } + /// Provide a separate [`Service`] used by the upgrade transport that + /// backs exec, attach, and port-forward. + /// + /// The supplied service is the same shape as [`GenericService`], the + /// boxed service produced by the default builder stack. Custom-service + /// users that do not naturally arrive at this shape can instead call + /// [`Client::new_with_upgrade`] directly. + /// + /// This is required only if the primary service may negotiate HTTP/2 + /// *and* the application also uses upgrade subresources. HTTP/1.1 + /// upgrades are unrepresentable on an HTTP/2 connection, so the + /// upgrade transport must offer only HTTP/1.1. + pub fn with_upgrade_service(self, upgrade_service: GenericService) -> Self { + ClientBuilder { + service: self.service, + upgrade_service: Some(upgrade_service), + default_ns: self.default_ns, + valid_until: self.valid_until, + } + } + /// Build a [`Client`] instance with the current [`Service`] stack. pub fn build(self) -> Client where @@ -80,7 +112,17 @@ impl ClientBuilder { B: http_body::Body + Send + 'static, B::Error: Into, { - Client::new(self.service, self.default_ns).with_valid_until(self.valid_until) + let Self { + service, + upgrade_service, + default_ns, + valid_until, + } = self; + match upgrade_service { + Some(upgrade) => Client::new_with_upgrade(service, upgrade, default_ns), + None => Client::new(service, default_ns), + } + .with_valid_until(valid_until) } } @@ -132,10 +174,13 @@ impl TryFrom for ClientBuilder { use base64::Engine; use http::HeaderValue; - let value = format!("Basic {}", base64::engine::general_purpose::STANDARD.encode(userinfo)); + let value = format!( + "Basic {}", + base64::engine::general_purpose::STANDARD.encode(userinfo) + ); let header = HeaderValue::from_str(&value).unwrap(); connector = connector.with_auth(header); - } + } } make_generic_builder(connector, config) @@ -167,34 +212,152 @@ where H::Error: 'static + Send + Sync + std::error::Error, { let default_ns = config.default_namespace.clone(); + + // Build two hyper clients with separate connection pools and ALPN policies: + // - the primary, h2-capable transport for normal REST/watch/log traffic + // (subject to `Config::disable_http2`) + // - an HTTP/1.1-only transport for the upgrade path used by exec, attach, + // and port-forward, regardless of `disable_http2`. + // + // Current TLS feature precedence when more than one is set: + // 1. rustls-tls + // 2. openssl-tls + // If neither TLS feature is enabled, the http connector is used; only the + // http scheme is supported in that case. + // Compute auth and extra-headers layers once and share across both + // transports. Calling `auth_layer()` twice would mint independent + // `RefreshableToken` state per transport, so each path would refresh + // tokens on its own and they'd diverge under exec-plugin or token-file + // auth. let auth_layer = config.auth_layer()?; + let extra_headers_layer = config.extra_headers_layer()?; + + // The two transports use connectors with different concrete types after + // TLS wrapping (h2-capable vs explicit-h1 ALPN), so each path is built + // and wrapped independently and erased to a `GenericService` here rather + // than threading the connector type through. + let main_service = build_main_service( + connector.clone(), + &config, + auth_layer.clone(), + extra_headers_layer.clone(), + )?; + let upgrade_service = build_upgrade_service(connector, &config, auth_layer, extra_headers_layer)?; + + let (_, expiration) = config.exec_identity_pem(); + + let client = ClientBuilder::new(main_service, default_ns) + .with_upgrade_service(upgrade_service) + .with_valid_until(expiration); + + Ok(client) +} + +/// Build the primary, h2-capable transport service. +/// +/// Uses the dual-protocol TLS connector (rustls advertises `h2,http/1.1` +/// in ALPN; openssl currently advertises nothing -- parity work is a +/// follow-up). The hyper client carries `TokioTimer` and HTTP/2 +/// keep-alive PINGs so watch streams survive idle-killing intermediaries +/// such as HAProxy. +/// +/// If `Config::disable_http2` is set, falls back to building a structurally +/// identical service via [`build_upgrade_service`] -- both clients then +/// carry HTTP/1.1-only connectors but the two-client shape stays the same. +fn build_main_service( + connector: H, + config: &Config, + auth_layer: Option, + extra_headers_layer: crate::client::middleware::ExtraHeadersLayer, +) -> Result +where + H: 'static + Clone + Send + Sync + Service, + H::Response: 'static + Connection + Read + Write + Send + Unpin, + H::Future: 'static + Send, + H::Error: 'static + Send + Sync + std::error::Error, +{ + if config.disable_http2 { + return build_upgrade_service(connector, config, auth_layer, extra_headers_layer); + } - let client: hyper_util::client::legacy::Client<_, Body> = { - // Current TLS feature precedence when more than one are set: - // 1. rustls-tls - // 2. openssl-tls - // Create a custom client to use something else. - // If TLS features are not enabled, http connector will be used. - #[cfg(feature = "rustls-tls")] - let connector = config.rustls_https_connector_with_connector(connector)?; - #[cfg(all(not(feature = "rustls-tls"), feature = "openssl-tls"))] - let connector = config.openssl_https_connector_with_connector(connector)?; - #[cfg(all(not(feature = "rustls-tls"), not(feature = "openssl-tls")))] + #[cfg(feature = "rustls-tls")] + let connector = config.rustls_https_connector_with_connector(connector)?; + #[cfg(all(not(feature = "rustls-tls"), feature = "openssl-tls"))] + let connector = config.openssl_https_connector_with_connector(connector)?; + #[cfg(all(not(feature = "rustls-tls"), not(feature = "openssl-tls")))] + { if config.cluster_url.scheme() == Some(&http::uri::Scheme::HTTPS) { - // no tls stack situation only works with http scheme return Err(Error::TlsRequired); } + } - let mut connector = TimeoutConnector::new(connector); + let mut connector = TimeoutConnector::new(connector); + connector.set_connect_timeout(config.connect_timeout); + connector.set_read_timeout(config.read_timeout); + connector.set_write_timeout(config.write_timeout); + + let mut builder = hyper_util::client::legacy::Builder::new(TokioExecutor::new()); + builder + .timer(TokioTimer::new()) + .http2_keep_alive_interval(Duration::from_secs(30)) + .http2_keep_alive_while_idle(true); + let client = builder.build(connector); + wrap_with_layers(client, config, auth_layer, extra_headers_layer) +} + +/// Build the HTTP/1.1-only upgrade transport service. +/// +/// Used by exec, attach, and port-forward; HTTP/1.1 upgrades are +/// unrepresentable on an HTTP/2 connection. The connector explicitly +/// advertises `http/1.1` in ALPN (rustls) so the server cannot pick +/// HTTP/2 at the TLS handshake. +fn build_upgrade_service( + connector: H, + config: &Config, + auth_layer: Option, + extra_headers_layer: crate::client::middleware::ExtraHeadersLayer, +) -> Result +where + H: 'static + Clone + Send + Sync + Service, + H::Response: 'static + Connection + Read + Write + Send + Unpin, + H::Future: 'static + Send, + H::Error: 'static + Send + Sync + std::error::Error, +{ + #[cfg(feature = "rustls-tls")] + let connector = config.rustls_https_connector_http1_only_with_connector(connector)?; + #[cfg(all(not(feature = "rustls-tls"), feature = "openssl-tls"))] + let connector = config.openssl_https_connector_with_connector(connector)?; + #[cfg(all(not(feature = "rustls-tls"), not(feature = "openssl-tls")))] + { + if config.cluster_url.scheme() == Some(&http::uri::Scheme::HTTPS) { + return Err(Error::TlsRequired); + } + } - // Set the timeouts for the client - connector.set_connect_timeout(config.connect_timeout); - connector.set_read_timeout(config.read_timeout); - connector.set_write_timeout(config.write_timeout); + let mut connector = TimeoutConnector::new(connector); + connector.set_connect_timeout(config.connect_timeout); + connector.set_read_timeout(config.read_timeout); + connector.set_write_timeout(config.write_timeout); - hyper_util::client::legacy::Builder::new(TokioExecutor::new()).build(connector) - }; + let builder = hyper_util::client::legacy::Builder::new(TokioExecutor::new()); + let client = builder.build(connector); + wrap_with_layers(client, config, auth_layer, extra_headers_layer) +} +/// Wrap a hyper client with the standard tower layer stack (base URI, gzip, +/// auth, extra headers, tracing) and erase to a [`GenericService`]. +fn wrap_with_layers( + client: hyper_util::client::legacy::Client, + config: &Config, + auth_layer: Option, + extra_headers_layer: crate::client::middleware::ExtraHeadersLayer, +) -> Result +where + C: 'static + Clone + Send + Sync + Service, + C::Response: 'static + Connection + Read + Write + Send + Unpin, + C::Future: 'static + Send + Unpin, + C::Error: Into>, +{ let stack = ServiceBuilder::new().layer(config.base_uri_layer()).into_inner(); #[cfg(feature = "gzip")] let stack = ServiceBuilder::new() @@ -211,7 +374,7 @@ where let service = ServiceBuilder::new() .layer(stack) .option_layer(auth_layer) - .layer(config.extra_headers_layer()?) + .layer(extra_headers_layer) .layer( // Attribute names follow [Semantic Conventions]. // [Semantic Conventions]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md @@ -263,19 +426,11 @@ where .map_err(BoxError::from) .service(client); - let (_, expiration) = config.exec_identity_pem(); - - let client = ClientBuilder::new( - service - .map_response_body(|body| { - Box::new(http_body_util::BodyExt::map_err(body, BoxError::from)) as Box - }) - .boxed(), - default_ns, - ) - .with_valid_until(expiration); - - Ok(client) + Ok(service + .map_response_body(|body| { + Box::new(http_body_util::BodyExt::map_err(body, BoxError::from)) as Box + }) + .boxed()) } #[cfg(test)] diff --git a/kube-client/src/client/config_ext.rs b/kube-client/src/client/config_ext.rs index bcb68e937..71765eabe 100644 --- a/kube-client/src/client/config_ext.rs +++ b/kube-client/src/client/config_ext.rs @@ -70,6 +70,26 @@ pub trait ConfigExt: private::Sealed { connector: H, ) -> Result>; + /// Create an HTTP/1.1-only HTTPS connector based on config and `connector`. + /// + /// Like [`rustls_https_connector_with_connector`](Self::rustls_https_connector_with_connector) + /// but advertises only `http/1.1` in ALPN. This is the connector used by + /// the upgrade transport that backs exec, attach, and port-forward, which + /// require an HTTP/1.1 connection because HTTP/1.1 upgrades are + /// unrepresentable on an HTTP/2 connection. + /// + /// Note: the return type is a kube-rs-internal connector rather than + /// `hyper_rustls::HttpsConnector` because hyper-rustls' builder cannot + /// produce an `http/1.1`-only ALPN advertisement (see + /// [`tls::rustls_tls::H1OnlyHttpsConnector`](crate::client::tls::rustls_tls) + /// for why). + #[cfg_attr(docsrs, doc(cfg(feature = "rustls-tls")))] + #[cfg(feature = "rustls-tls")] + fn rustls_https_connector_http1_only_with_connector( + &self, + connector: H, + ) -> Result>; + /// Create [`rustls::ClientConfig`] based on config. /// # Example /// @@ -247,23 +267,27 @@ impl ConfigExt for Config { &self, connector: H, ) -> Result> { - use hyper_rustls::FixedServerNameResolver; - - use crate::client::tls::rustls_tls; + let builder = self.rustls_https_connector_builder()?; + Ok(builder.enable_http1().enable_http2().wrap_connector(connector)) + } + #[cfg(feature = "rustls-tls")] + fn rustls_https_connector_http1_only_with_connector( + &self, + connector: H, + ) -> Result> { + use crate::client::tls::rustls_tls::{Error as RustlsError, H1OnlyHttpsConnector}; let rustls_config = self.rustls_client_config()?; - let mut builder = hyper_rustls::HttpsConnectorBuilder::new() - .with_tls_config(rustls_config) - .https_or_http(); - if let Some(tsn) = self.tls_server_name.as_ref() { - builder = builder.with_server_name_resolver(FixedServerNameResolver::new( - tsn.clone() - .try_into() - .map_err(rustls_tls::Error::InvalidServerName) - .map_err(Error::RustlsTls)?, - )); - } - Ok(builder.enable_http1().wrap_connector(connector)) + let server_name = self + .tls_server_name + .as_ref() + .map(|n| { + rustls::pki_types::ServerName::try_from(n.clone()) + .map_err(RustlsError::InvalidServerName) + .map_err(Error::RustlsTls) + }) + .transpose()?; + Ok(H1OnlyHttpsConnector::new(connector, rustls_config, server_name)) } #[cfg(feature = "openssl-tls")] @@ -313,6 +337,31 @@ impl ConfigExt for Config { } } +impl Config { + #[cfg(feature = "rustls-tls")] + fn rustls_https_connector_builder( + &self, + ) -> Result> { + use hyper_rustls::FixedServerNameResolver; + + use crate::client::tls::rustls_tls; + + let rustls_config = self.rustls_client_config()?; + let mut builder = hyper_rustls::HttpsConnectorBuilder::new() + .with_tls_config(rustls_config) + .https_or_http(); + if let Some(tsn) = self.tls_server_name.as_ref() { + builder = builder.with_server_name_resolver(FixedServerNameResolver::new( + tsn.clone() + .try_into() + .map_err(rustls_tls::Error::InvalidServerName) + .map_err(Error::RustlsTls)?, + )); + } + Ok(builder) + } +} + impl Config { // This is necessary to retrieve an identity when an exec plugin // returns a client certificate and key instead of a token. diff --git a/kube-client/src/client/middleware/mod.rs b/kube-client/src/client/middleware/mod.rs index 246aed6a6..b778edcde 100644 --- a/kube-client/src/client/middleware/mod.rs +++ b/kube-client/src/client/middleware/mod.rs @@ -10,6 +10,7 @@ pub use extra_headers::{ExtraHeaders, ExtraHeadersLayer}; use super::auth::RefreshableToken; /// Layer to set up `Authorization` header depending on the config. +#[derive(Clone)] pub struct AuthLayer(pub(crate) Either>); impl Layer for AuthLayer { diff --git a/kube-client/src/client/mod.rs b/kube-client/src/client/mod.rs index e4ca35623..6c1b079ca 100644 --- a/kube-client/src/client/mod.rs +++ b/kube-client/src/client/mod.rs @@ -11,8 +11,7 @@ use either::{Either, Left, Right}; use futures::{AsyncBufRead, StreamExt, TryStream, TryStreamExt, future::BoxFuture}; use http::{self, Request, Response}; use http_body_util::BodyExt; -#[cfg(feature = "ws")] -use hyper_util::rt::TokioIo; +#[cfg(feature = "ws")] use hyper_util::rt::TokioIo; use jiff::Timestamp; use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1; use kube_core::{discovery::v2::ACCEPT_AGGREGATED_DISCOVERY_V2, response::Status}; @@ -49,15 +48,12 @@ pub use config_ext::ConfigExt; pub mod middleware; pub mod retry; -#[cfg(any(feature = "rustls-tls", feature = "openssl-tls"))] -mod tls; +#[cfg(any(feature = "rustls-tls", feature = "openssl-tls"))] mod tls; #[cfg(feature = "openssl-tls")] pub use tls::openssl_tls::Error as OpensslTlsError; -#[cfg(feature = "rustls-tls")] -pub use tls::rustls_tls::Error as RustlsTlsError; -#[cfg(feature = "ws")] -mod upgrade; +#[cfg(feature = "rustls-tls")] pub use tls::rustls_tls::Error as RustlsTlsError; +#[cfg(feature = "ws")] mod upgrade; #[cfg(feature = "oauth")] #[cfg_attr(docsrs, doc(cfg(feature = "oauth")))] @@ -67,8 +63,7 @@ pub use auth::OAuthError; #[cfg_attr(docsrs, doc(cfg(feature = "oidc")))] pub use auth::oidc_errors; -#[cfg(feature = "ws")] -pub use upgrade::UpgradeConnectionError; +#[cfg(feature = "ws")] pub use upgrade::UpgradeConnectionError; #[cfg(feature = "kubelet-debug")] #[cfg_attr(docsrs, doc(cfg(feature = "kubelet-debug")))] @@ -88,6 +83,10 @@ pub struct Client { // - `Buffer` for cheap clone // - `BoxFuture` for dynamic response future type inner: Buffer, BoxFuture<'static, Result, BoxError>>>, + // HTTP/1.1-only transport for the upgrade path used by exec, attach, + // and port-forward. Defaults to a clone of `inner` when not separately + // configured; see [`Client::new_with_upgrade`]. + upgrade_inner: Buffer, BoxFuture<'static, Result, BoxError>>>, default_ns: String, valid_until: Option, } @@ -115,6 +114,29 @@ impl Connection { } } +async fn send_via( + svc: &Buffer, BoxFuture<'static, Result, BoxError>>>, + request: Request, +) -> Result> { + let mut svc = svc.clone(); + let res = svc + .ready() + .await + .map_err(Error::Service)? + .call(request) + .await + .map_err(|err| { + // Error decorating request + err.downcast::() + .map(|e| *e) + // Error requesting + .or_else(|err| err.downcast::().map(|err| Error::HyperError(*err))) + // Error from another middleware + .unwrap_or_else(Error::Service) + })?; + Ok(res) +} + /// Constructors and low-level api interfaces. /// /// Most users only need [`Client::try_default`] or [`Client::new`] from this block. @@ -164,8 +186,51 @@ impl Client { .map_response_body(Body::wrap_body) .map_err(Into::into) .boxed(); + let inner = Buffer::new(service, 1024); + let upgrade_inner = inner.clone(); + Self { + inner, + upgrade_inner, + default_ns: default_namespace.into(), + valid_until: None, + } + } + + /// Create a [`Client`] with separate primary and upgrade [`Service`] stacks. + /// + /// The `service` is used for normal traffic; `upgrade_service` is used by + /// [`Client::connect`] for exec, attach, and port-forward, which require + /// HTTP/1.1. + /// + /// Most users do not need this; [`Client::new`] (or [`Client::try_from`]) + /// is sufficient. Use this constructor when supplying a custom `Service` + /// stack that may negotiate HTTP/2 *and* the application also uses + /// upgrade subresources. + pub fn new_with_upgrade(service: S, upgrade_service: U, default_namespace: T) -> Self + where + S: Service, Response = Response> + Send + 'static, + S::Future: Send + 'static, + S::Error: Into, + B1: http_body::Body + Send + 'static, + B1::Error: Into, + U: Service, Response = Response> + Send + 'static, + U::Future: Send + 'static, + U::Error: Into, + B2: http_body::Body + Send + 'static, + B2::Error: Into, + T: Into, + { + let service = service + .map_response_body(Body::wrap_body) + .map_err(Into::into) + .boxed(); + let upgrade_service = upgrade_service + .map_response_body(Body::wrap_body) + .map_err(Into::into) + .boxed(); Self { inner: Buffer::new(service, 1024), + upgrade_inner: Buffer::new(upgrade_service, 1024), default_ns: default_namespace.into(), valid_until: None, } @@ -215,23 +280,12 @@ impl Client { /// This method can be used to get raw access to the API which may be used to, for example, /// create a proxy server or application-level gateway between localhost and the API server. pub async fn send(&self, request: Request) -> Result> { - let mut svc = self.inner.clone(); - let res = svc - .ready() - .await - .map_err(Error::Service)? - .call(request) - .await - .map_err(|err| { - // Error decorating request - err.downcast::() - .map(|e| *e) - // Error requesting - .or_else(|err| err.downcast::().map(|err| Error::HyperError(*err))) - // Error from another middleware - .unwrap_or_else(Error::Service) - })?; - Ok(res) + send_via(&self.inner, request).await + } + + #[cfg(feature = "ws")] + async fn send_upgrade(&self, request: Request) -> Result> { + send_via(&self.upgrade_inner, request).await } /// Make WebSocket connection. @@ -257,7 +311,9 @@ impl Client { ); upgrade::StreamProtocol::add_to_headers(&mut parts.headers)?; - let res = self.send(Request::from_parts(parts, Body::from(body))).await?; + let res = self + .send_upgrade(Request::from_parts(parts, Body::from(body))) + .await?; let protocol = upgrade::verify_response(&res, &key).map_err(Error::UpgradeConnection)?; match hyper::upgrade::on(res).await { Ok(upgraded) => Ok(Connection { diff --git a/kube-client/src/client/tls.rs b/kube-client/src/client/tls.rs index 43fc258ba..2941fd674 100644 --- a/kube-client/src/client/tls.rs +++ b/kube-client/src/client/tls.rs @@ -352,6 +352,133 @@ FRU= ] } } + + /// HTTP/1.1-only HTTPS connector that mirrors `hyper_rustls::HttpsConnector` + /// but allows specifying both an explicit ALPN advertisement and a custom + /// TLS server name. + /// + /// Why this exists: hyper-rustls' builder asserts that the rustls + /// `ClientConfig`'s `alpn_protocols` is empty in `with_tls_config`, and + /// only `enable_http2()` populates it afterwards. The `enable_http1()`-only + /// builder path therefore leaves ALPN empty -- which means no ALPN + /// extension is sent on the wire and a modern apiserver may still + /// negotiate HTTP/2. We need an explicit `http/1.1` advertisement to force + /// HTTP/1.1 for the upgrade transport, otherwise upgrades break. + /// + /// The `From<(H, Arc)>` impl on `hyper_rustls::HttpsConnector` + /// would let us bypass the assertion, but it constructs the connector + /// with the *default* server-name resolver and there is no public API to + /// swap that resolver afterwards. Users who set `Config::tls_server_name` + /// would silently lose SNI override on the upgrade transport, breaking + /// cert validation against alternate-host clusters. So we reimplement + /// the small TCP-then-TLS dance here over public hyper-rustls types, + /// which lets us honour `tls_server_name` without an upstream change. + pub struct H1OnlyHttpsConnector { + http: H, + tls_config: std::sync::Arc, + server_name: Option>, + } + + impl Clone for H1OnlyHttpsConnector { + fn clone(&self) -> Self { + Self { + http: self.http.clone(), + tls_config: self.tls_config.clone(), + server_name: self.server_name.clone(), + } + } + } + + impl H1OnlyHttpsConnector { + pub fn new(http: H, mut tls_config: ClientConfig, server_name: Option>) -> Self { + tls_config.alpn_protocols = vec![b"http/1.1".to_vec()]; + Self { + http, + tls_config: std::sync::Arc::new(tls_config), + server_name, + } + } + } + + impl tower::Service for H1OnlyHttpsConnector + where + H: tower::Service + Send + Clone + 'static, + H::Response: hyper::rt::Read + + hyper::rt::Write + + hyper_util::client::legacy::connect::Connection + + Unpin + + Send + + 'static, + H::Future: Send + 'static, + H::Error: Into>, + { + type Error = std::io::Error; + type Future = std::pin::Pin< + Box> + Send>, + >; + type Response = hyper_rustls::MaybeHttpsStream; + + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.http + .poll_ready(cx) + .map_err(|e| std::io::Error::other(e.into())) + } + + fn call(&mut self, dst: http::Uri) -> Self::Future { + // Fall back to plain HTTP for `http://` URIs, matching + // hyper_rustls' default behaviour. + if dst.scheme() == Some(&http::uri::Scheme::HTTP) { + let fut = self.http.call(dst); + return Box::pin(async move { + let s = fut.await.map_err(|e| std::io::Error::other(e.into()))?; + Ok(hyper_rustls::MaybeHttpsStream::Http(s)) + }); + } + if dst.scheme() != Some(&http::uri::Scheme::HTTPS) { + let scheme = dst.scheme().map(|s| s.to_string()).unwrap_or_default(); + return Box::pin(async move { + Err(std::io::Error::other(format!("unsupported scheme {scheme}"))) + }); + } + + // Resolve SNI: explicit override wins, else use the URI host. + let sni = match self.server_name.clone() { + Some(name) => Ok(name), + None => match dst.host() { + Some(host) => { + // Strip surrounding brackets on IPv6 literals. + let host = host.trim_start_matches('[').trim_end_matches(']'); + ServerName::try_from(host.to_owned()).map_err(Error::InvalidServerName) + } + None => { + return Box::pin(async move { Err(std::io::Error::other("missing host in URI")) }); + } + }, + }; + let sni = match sni { + Ok(s) => s, + Err(e) => { + return Box::pin(async move { Err(std::io::Error::other(e)) }); + } + }; + + let cfg = self.tls_config.clone(); + let connecting = self.http.call(dst); + Box::pin(async move { + let tcp = connecting.await.map_err(|e| std::io::Error::other(e.into()))?; + let tls = tokio_rustls::TlsConnector::from(cfg) + .connect(sni, hyper_util::rt::TokioIo::new(tcp)) + .await + .map_err(std::io::Error::other)?; + Ok(hyper_rustls::MaybeHttpsStream::Https( + hyper_util::rt::TokioIo::new(tls), + )) + }) + } + } } #[cfg(feature = "openssl-tls")] diff --git a/kube-client/src/config/mod.rs b/kube-client/src/config/mod.rs index ba85d634d..01a7cedb3 100644 --- a/kube-client/src/config/mod.rs +++ b/kube-client/src/config/mod.rs @@ -163,6 +163,16 @@ pub struct Config { pub auth_info: AuthInfo, /// Whether to disable compression (would only have an effect when the `gzip` feature is enabled) pub disable_compression: bool, + /// Whether to disable HTTP/2. + /// + /// HTTP/2 is enabled by default and negotiated via ALPN where the TLS + /// backend supports it (currently `rustls-tls`). Set this to `true` to + /// force the client to negotiate HTTP/1.1 only, e.g. when working around + /// an intermediary that mishandles HTTP/2. + /// + /// Has no effect on the upgrade transport used by exec, attach, and + /// port-forward, which is always HTTP/1.1. + pub disable_http2: bool, /// Optional proxy URL. Proxy support requires the `socks5` feature. pub proxy_url: Option, /// If set, apiserver certificate will be validated to contain this string @@ -191,6 +201,7 @@ impl Config { accept_invalid_certs: false, auth_info: AuthInfo::default(), disable_compression: false, + disable_http2: false, proxy_url: None, tls_server_name: None, headers: Vec::new(), @@ -275,6 +286,7 @@ impl Config { ..Default::default() }, disable_compression: false, + disable_http2: false, proxy_url: None, tls_server_name: None, headers: Vec::new(), @@ -336,6 +348,7 @@ impl Config { write_timeout: Some(DEFAULT_WRITE_TIMEOUT), accept_invalid_certs, disable_compression, + disable_http2: false, proxy_url: loader.proxy_url()?, auth_info: loader.user, tls_server_name: loader.cluster.tls_server_name,