Skip to content

Commit 4e33b1a

Browse files
committed
Split client into HTTP/2 and HTTP/1.1 transports
The previous commit on this branch enabled HTTP/2 on the rustls connector by switching ALPN from `enable_http1()` alone to `enable_http1().enable_http2()`. REST traffic, watch streams and log streaming benefit from HTTP/2 multiplexing, but exec, attach and port-forward use HTTP/1.1 connection upgrades, which are unrepresentable on an HTTP/2 connection. After ALPN negotiates `h2`, the apiserver answered upgrade requests with 4xx/5xx and the integration suite started failing with `UpgradeConnection(ProtocolSwitch(...))`. Fix this by giving `Client` two transports with separate connection pools and ALPN policies: * a primary, h2-capable transport for normal traffic, configured with `TokioTimer` and HTTP/2 keep-alive PINGs (interval 30s, while-idle on) so watch streams survive idle-killing intermediaries such as HAProxy; * an HTTP/1.1-only transport used by `Client::connect()`, the only caller of `hyper::upgrade::on` in the workspace, hard-routed there via a new `upgrade_inner` field on `Client`. A subtle point governs the h1-only path on rustls. hyper-rustls' builder asserts that `alpn_protocols.is_empty()` when accepting a rustls config, and only `enable_http2()` populates the list. The `enable_http1()`-only builder path therefore leaves ALPN empty, which means no ALPN extension is sent on the wire and a modern apiserver may still negotiate HTTP/2 -- the very condition we are trying to avoid. The h1-only sibling must advertise `http/1.1` *explicitly*. The natural workaround -- `HttpsConnector::from((http, Arc::new(rustls_config)))` with `alpn_protocols = vec![b"http/1.1".to_vec()]` -- bypasses the builder's assertion but also drops `Config::tls_server_name`, because the `From` impl constructs the connector with the default server-name resolver and the resolver field is private. Rather than depend on an upstream hyper-rustls change, ship a small `H1OnlyHttpsConnector<H>` in `kube-client/src/client/tls.rs` that mirrors the public TCP-then-TLS dance from `hyper_rustls::HttpsConnector::call`, sets the explicit ALPN advertisement, and resolves SNI via `Config::tls_server_name` when set or from the URI host otherwise. This adds `tokio-rustls` as a direct dep gated on `rustls-tls` (already in the dep tree transitively via hyper-rustls). Other invariants worth preserving: * `Config::auth_layer()` and `Config::extra_headers_layer()` are computed once and *cloned* into both transport stacks. Calling `auth_layer()` twice would mint independent `RefreshableToken` state, so each transport would refresh tokens on its own and could diverge under exec-plugin or token-file auth. `AuthLayer` gains `#[derive(Clone)]` for this; the inner `Either` was already trivially cloneable. * `Config` gains a public `disable_http2: bool` field as a runtime escape hatch. When set, the primary transport falls back to the h1-only client; the two-client shape stays the same so the upgrade path is unaffected. HTTP/2 stays on by default; the field matches the existing `disable_compression` style. * `ClientBuilder` grows `with_upgrade_service`, paired with a new `Client::new_with_upgrade` constructor for custom-service users who supply their own service stack and need to opt in to the split. The single-service `Client::new` keeps its signature; it internally clones the buffered handle into `upgrade_inner` so back-compat is preserved. Verified against k3d v1.34.1 with the integration suite, the `pod_exec`, `pod_attach`, `pod_portforward*`, `pod_shell*` and `log_stream` examples, and a full 1200-combo `cargo hack` feature powerset. Signed-off-by: Andrew McDermott <aim@frobware.com>
1 parent 442672d commit 4e33b1a

7 files changed

Lines changed: 483 additions & 81 deletions

File tree

kube-client/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ categories = ["web-programming::http-client", "network-programming", "api-bindin
1313

1414
[features]
1515
default = ["client", "ring"]
16-
rustls-tls = ["rustls", "hyper-rustls"]
16+
rustls-tls = ["rustls", "hyper-rustls", "tokio-rustls"]
1717
webpki-roots = ["hyper-rustls/webpki-roots"]
1818
aws-lc-rs = ["hyper-rustls?/aws-lc-rs"]
1919
ring = ["hyper-rustls?/ring"]
@@ -57,6 +57,7 @@ jiff = { workspace = true, optional = true, features = ["std", "serde"] }
5757
pem = { workspace = true, optional = true }
5858
openssl = { workspace = true, optional = true }
5959
rustls = { workspace = true, optional = true }
60+
tokio-rustls = { version = "0.26", default-features = false, optional = true }
6061
bytes = { workspace = true, optional = true }
6162
tokio = { workspace = true, features = ["time", "signal", "sync", "rt"], optional = true }
6263
kube-core = { path = "../kube-core", version = "=3.1.0" }

kube-client/src/client/builder.rs

Lines changed: 192 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use hyper_timeout::TimeoutConnector;
88

99
use hyper_util::{
1010
client::legacy::connect::{Connection, HttpConnector},
11-
rt::TokioExecutor,
11+
rt::{TokioExecutor, TokioTimer},
1212
};
1313

1414
use jiff::Timestamp;
@@ -28,6 +28,7 @@ pub type DynBody = dyn http_body::Body<Data = Bytes, Error = BoxError> + Send +
2828
/// Builder for [`Client`] instances with customized [tower](`Service`) middleware.
2929
pub struct ClientBuilder<Svc> {
3030
service: Svc,
31+
upgrade_service: Option<GenericService>,
3132
default_ns: String,
3233
valid_until: Option<Timestamp>,
3334
}
@@ -43,20 +44,29 @@ impl<Svc> ClientBuilder<Svc> {
4344
{
4445
Self {
4546
service,
47+
upgrade_service: None,
4648
default_ns: default_namespace.into(),
4749
valid_until: None,
4850
}
4951
}
5052

5153
/// Add a [`Layer`] to the current [`Service`] stack.
54+
///
55+
/// The layer is applied to the primary [`Service`] only. If an upgrade
56+
/// service has been set via [`with_upgrade_service`](Self::with_upgrade_service)
57+
/// it is left untouched; users wanting to layer both must apply the
58+
/// layer to each service themselves before calling
59+
/// [`with_upgrade_service`].
5260
pub fn with_layer<L: Layer<Svc>>(self, layer: &L) -> ClientBuilder<L::Service> {
5361
let Self {
5462
service: stack,
63+
upgrade_service,
5564
default_ns,
5665
valid_until,
5766
} = self;
5867
ClientBuilder {
5968
service: layer.layer(stack),
69+
upgrade_service,
6070
default_ns,
6171
valid_until,
6272
}
@@ -66,11 +76,33 @@ impl<Svc> ClientBuilder<Svc> {
6676
pub fn with_valid_until(self, valid_until: Option<Timestamp>) -> Self {
6777
ClientBuilder {
6878
service: self.service,
79+
upgrade_service: self.upgrade_service,
6980
default_ns: self.default_ns,
7081
valid_until,
7182
}
7283
}
7384

85+
/// Provide a separate [`Service`] used by the upgrade transport that
86+
/// backs exec, attach, and port-forward.
87+
///
88+
/// The supplied service is the same shape as [`GenericService`], the
89+
/// boxed service produced by the default builder stack. Custom-service
90+
/// users that do not naturally arrive at this shape can instead call
91+
/// [`Client::new_with_upgrade`] directly.
92+
///
93+
/// This is required only if the primary service may negotiate HTTP/2
94+
/// *and* the application also uses upgrade subresources. HTTP/1.1
95+
/// upgrades are unrepresentable on an HTTP/2 connection, so the
96+
/// upgrade transport must offer only HTTP/1.1.
97+
pub fn with_upgrade_service(self, upgrade_service: GenericService) -> Self {
98+
ClientBuilder {
99+
service: self.service,
100+
upgrade_service: Some(upgrade_service),
101+
default_ns: self.default_ns,
102+
valid_until: self.valid_until,
103+
}
104+
}
105+
74106
/// Build a [`Client`] instance with the current [`Service`] stack.
75107
pub fn build<B>(self) -> Client
76108
where
@@ -80,7 +112,17 @@ impl<Svc> ClientBuilder<Svc> {
80112
B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
81113
B::Error: Into<BoxError>,
82114
{
83-
Client::new(self.service, self.default_ns).with_valid_until(self.valid_until)
115+
let Self {
116+
service,
117+
upgrade_service,
118+
default_ns,
119+
valid_until,
120+
} = self;
121+
match upgrade_service {
122+
Some(upgrade) => Client::new_with_upgrade(service, upgrade, default_ns),
123+
None => Client::new(service, default_ns),
124+
}
125+
.with_valid_until(valid_until)
84126
}
85127
}
86128

@@ -132,10 +174,13 @@ impl TryFrom<Config> for ClientBuilder<GenericService> {
132174
use base64::Engine;
133175
use http::HeaderValue;
134176

135-
let value = format!("Basic {}", base64::engine::general_purpose::STANDARD.encode(userinfo));
177+
let value = format!(
178+
"Basic {}",
179+
base64::engine::general_purpose::STANDARD.encode(userinfo)
180+
);
136181
let header = HeaderValue::from_str(&value).unwrap();
137182
connector = connector.with_auth(header);
138-
}
183+
}
139184
}
140185

141186
make_generic_builder(connector, config)
@@ -167,34 +212,152 @@ where
167212
H::Error: 'static + Send + Sync + std::error::Error,
168213
{
169214
let default_ns = config.default_namespace.clone();
215+
216+
// Build two hyper clients with separate connection pools and ALPN policies:
217+
// - the primary, h2-capable transport for normal REST/watch/log traffic
218+
// (subject to `Config::disable_http2`)
219+
// - an HTTP/1.1-only transport for the upgrade path used by exec, attach,
220+
// and port-forward, regardless of `disable_http2`.
221+
//
222+
// Current TLS feature precedence when more than one is set:
223+
// 1. rustls-tls
224+
// 2. openssl-tls
225+
// If neither TLS feature is enabled, the http connector is used; only the
226+
// http scheme is supported in that case.
227+
// Compute auth and extra-headers layers once and share across both
228+
// transports. Calling `auth_layer()` twice would mint independent
229+
// `RefreshableToken` state per transport, so each path would refresh
230+
// tokens on its own and they'd diverge under exec-plugin or token-file
231+
// auth.
170232
let auth_layer = config.auth_layer()?;
233+
let extra_headers_layer = config.extra_headers_layer()?;
234+
235+
// The two transports use connectors with different concrete types after
236+
// TLS wrapping (h2-capable vs explicit-h1 ALPN), so each path is built
237+
// and wrapped independently and erased to a `GenericService` here rather
238+
// than threading the connector type through.
239+
let main_service = build_main_service(
240+
connector.clone(),
241+
&config,
242+
auth_layer.clone(),
243+
extra_headers_layer.clone(),
244+
)?;
245+
let upgrade_service = build_upgrade_service(connector, &config, auth_layer, extra_headers_layer)?;
246+
247+
let (_, expiration) = config.exec_identity_pem();
248+
249+
let client = ClientBuilder::new(main_service, default_ns)
250+
.with_upgrade_service(upgrade_service)
251+
.with_valid_until(expiration);
252+
253+
Ok(client)
254+
}
255+
256+
/// Build the primary, h2-capable transport service.
257+
///
258+
/// Uses the dual-protocol TLS connector (rustls advertises `h2,http/1.1`
259+
/// in ALPN; openssl currently advertises nothing -- parity work is a
260+
/// follow-up). The hyper client carries `TokioTimer` and HTTP/2
261+
/// keep-alive PINGs so watch streams survive idle-killing intermediaries
262+
/// such as HAProxy.
263+
///
264+
/// If `Config::disable_http2` is set, falls back to building a structurally
265+
/// identical service via [`build_upgrade_service`] -- both clients then
266+
/// carry HTTP/1.1-only connectors but the two-client shape stays the same.
267+
fn build_main_service<H>(
268+
connector: H,
269+
config: &Config,
270+
auth_layer: Option<crate::client::middleware::AuthLayer>,
271+
extra_headers_layer: crate::client::middleware::ExtraHeadersLayer,
272+
) -> Result<GenericService, Error>
273+
where
274+
H: 'static + Clone + Send + Sync + Service<http::Uri>,
275+
H::Response: 'static + Connection + Read + Write + Send + Unpin,
276+
H::Future: 'static + Send,
277+
H::Error: 'static + Send + Sync + std::error::Error,
278+
{
279+
if config.disable_http2 {
280+
return build_upgrade_service(connector, config, auth_layer, extra_headers_layer);
281+
}
171282

172-
let client: hyper_util::client::legacy::Client<_, Body> = {
173-
// Current TLS feature precedence when more than one are set:
174-
// 1. rustls-tls
175-
// 2. openssl-tls
176-
// Create a custom client to use something else.
177-
// If TLS features are not enabled, http connector will be used.
178-
#[cfg(feature = "rustls-tls")]
179-
let connector = config.rustls_https_connector_with_connector(connector)?;
180-
#[cfg(all(not(feature = "rustls-tls"), feature = "openssl-tls"))]
181-
let connector = config.openssl_https_connector_with_connector(connector)?;
182-
#[cfg(all(not(feature = "rustls-tls"), not(feature = "openssl-tls")))]
283+
#[cfg(feature = "rustls-tls")]
284+
let connector = config.rustls_https_connector_with_connector(connector)?;
285+
#[cfg(all(not(feature = "rustls-tls"), feature = "openssl-tls"))]
286+
let connector = config.openssl_https_connector_with_connector(connector)?;
287+
#[cfg(all(not(feature = "rustls-tls"), not(feature = "openssl-tls")))]
288+
{
183289
if config.cluster_url.scheme() == Some(&http::uri::Scheme::HTTPS) {
184-
// no tls stack situation only works with http scheme
185290
return Err(Error::TlsRequired);
186291
}
292+
}
187293

188-
let mut connector = TimeoutConnector::new(connector);
294+
let mut connector = TimeoutConnector::new(connector);
295+
connector.set_connect_timeout(config.connect_timeout);
296+
connector.set_read_timeout(config.read_timeout);
297+
connector.set_write_timeout(config.write_timeout);
298+
299+
let mut builder = hyper_util::client::legacy::Builder::new(TokioExecutor::new());
300+
builder
301+
.timer(TokioTimer::new())
302+
.http2_keep_alive_interval(Duration::from_secs(30))
303+
.http2_keep_alive_while_idle(true);
304+
let client = builder.build(connector);
305+
wrap_with_layers(client, config, auth_layer, extra_headers_layer)
306+
}
307+
308+
/// Build the HTTP/1.1-only upgrade transport service.
309+
///
310+
/// Used by exec, attach, and port-forward; HTTP/1.1 upgrades are
311+
/// unrepresentable on an HTTP/2 connection. The connector explicitly
312+
/// advertises `http/1.1` in ALPN (rustls) so the server cannot pick
313+
/// HTTP/2 at the TLS handshake.
314+
fn build_upgrade_service<H>(
315+
connector: H,
316+
config: &Config,
317+
auth_layer: Option<crate::client::middleware::AuthLayer>,
318+
extra_headers_layer: crate::client::middleware::ExtraHeadersLayer,
319+
) -> Result<GenericService, Error>
320+
where
321+
H: 'static + Clone + Send + Sync + Service<http::Uri>,
322+
H::Response: 'static + Connection + Read + Write + Send + Unpin,
323+
H::Future: 'static + Send,
324+
H::Error: 'static + Send + Sync + std::error::Error,
325+
{
326+
#[cfg(feature = "rustls-tls")]
327+
let connector = config.rustls_https_connector_http1_only_with_connector(connector)?;
328+
#[cfg(all(not(feature = "rustls-tls"), feature = "openssl-tls"))]
329+
let connector = config.openssl_https_connector_with_connector(connector)?;
330+
#[cfg(all(not(feature = "rustls-tls"), not(feature = "openssl-tls")))]
331+
{
332+
if config.cluster_url.scheme() == Some(&http::uri::Scheme::HTTPS) {
333+
return Err(Error::TlsRequired);
334+
}
335+
}
189336

190-
// Set the timeouts for the client
191-
connector.set_connect_timeout(config.connect_timeout);
192-
connector.set_read_timeout(config.read_timeout);
193-
connector.set_write_timeout(config.write_timeout);
337+
let mut connector = TimeoutConnector::new(connector);
338+
connector.set_connect_timeout(config.connect_timeout);
339+
connector.set_read_timeout(config.read_timeout);
340+
connector.set_write_timeout(config.write_timeout);
194341

195-
hyper_util::client::legacy::Builder::new(TokioExecutor::new()).build(connector)
196-
};
342+
let builder = hyper_util::client::legacy::Builder::new(TokioExecutor::new());
343+
let client = builder.build(connector);
344+
wrap_with_layers(client, config, auth_layer, extra_headers_layer)
345+
}
197346

347+
/// Wrap a hyper client with the standard tower layer stack (base URI, gzip,
348+
/// auth, extra headers, tracing) and erase to a [`GenericService`].
349+
fn wrap_with_layers<C>(
350+
client: hyper_util::client::legacy::Client<C, Body>,
351+
config: &Config,
352+
auth_layer: Option<crate::client::middleware::AuthLayer>,
353+
extra_headers_layer: crate::client::middleware::ExtraHeadersLayer,
354+
) -> Result<GenericService, Error>
355+
where
356+
C: 'static + Clone + Send + Sync + Service<http::Uri>,
357+
C::Response: 'static + Connection + Read + Write + Send + Unpin,
358+
C::Future: 'static + Send + Unpin,
359+
C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
360+
{
198361
let stack = ServiceBuilder::new().layer(config.base_uri_layer()).into_inner();
199362
#[cfg(feature = "gzip")]
200363
let stack = ServiceBuilder::new()
@@ -211,7 +374,7 @@ where
211374
let service = ServiceBuilder::new()
212375
.layer(stack)
213376
.option_layer(auth_layer)
214-
.layer(config.extra_headers_layer()?)
377+
.layer(extra_headers_layer)
215378
.layer(
216379
// Attribute names follow [Semantic Conventions].
217380
// [Semantic Conventions]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md
@@ -263,19 +426,11 @@ where
263426
.map_err(BoxError::from)
264427
.service(client);
265428

266-
let (_, expiration) = config.exec_identity_pem();
267-
268-
let client = ClientBuilder::new(
269-
service
270-
.map_response_body(|body| {
271-
Box::new(http_body_util::BodyExt::map_err(body, BoxError::from)) as Box<DynBody>
272-
})
273-
.boxed(),
274-
default_ns,
275-
)
276-
.with_valid_until(expiration);
277-
278-
Ok(client)
429+
Ok(service
430+
.map_response_body(|body| {
431+
Box::new(http_body_util::BodyExt::map_err(body, BoxError::from)) as Box<DynBody>
432+
})
433+
.boxed())
279434
}
280435

281436
#[cfg(test)]

0 commit comments

Comments
 (0)