Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion orion-data-plane-api/src/xds/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl TryFrom<&str> for TypeUrl {
"type.googleapis.com/envoy.config.route.v3.RouteConfiguration" => Ok(TypeUrl::RouteConfiguration),
"type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment" => Ok(TypeUrl::ClusterLoadAssignment),
"type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.Secret" => Ok(TypeUrl::Secret),
value => Err(XdsError::UnknownResourceType(format!("did not recognise type_url {}", value))),
value => Err(XdsError::UnknownResourceType(format!("did not recognise type_url {value}"))),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions orion-data-plane-api/tests/envoy_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ typed_extension_protocol_options:
assert_eq!(ka.interval.as_ref().unwrap(), &Duration { seconds: 30, nanos: 0 });
assert_eq!(ka.timeout.as_ref().unwrap(), &Duration { seconds: 5, nanos: 0 });
} else {
panic!("Expecting http2 options, got {:?}", cfg);
panic!("Expecting http2 options, got {cfg:?}");
}
} else {
panic!("Expecting ExplicitHttpConfig, got {:?}", upstream_opts);
panic!("Expecting ExplicitHttpConfig, got {upstream_opts:?}");
}
}

Expand Down
4 changes: 1 addition & 3 deletions orion-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ hyper-rustls = { version = "0.27.1", features = ["default", "http2"] }
ipnet = "2.9"
once_cell = { version = "1.19" }
opentelemetry = "0.29.0"
orion-client = { package = "hyper-util", version = "0.1.15", features = [
"full",
] }
hyper-util.workspace = true
orion-configuration.workspace = true
orion-data-plane-api.workspace = true
orion-error.workspace = true
Expand Down
4 changes: 2 additions & 2 deletions orion-lib/src/clusters/retry_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub enum EventError {

// DISCLAIMER: This is a workaround for the fact that `EventError` can't implement `Clone`.
// Cloning is not possible because `Elapsed` and `io::Error` do not implement `Clone`.
// Their presence in `EventError` is required by the `orion_client` crate, as it needs
// Their presence in `EventError` is required by the `hyper_util` crate, as it needs
// to traverse the `EventError` to extract either the underlying `io::Error` or `Elapsed`
// in order to produce a more specific error message.
// In this case, we create a new `EventError` by reconstructing the `io::Error`
Expand Down Expand Up @@ -126,7 +126,7 @@ impl<'a> TryInferFrom<&'a (dyn std::error::Error + 'static)> for EventError {
}
}

if let Some(h_err) = err.downcast_ref::<orion_client::client::legacy::Error>() {
if let Some(h_err) = err.downcast_ref::<hyper_util::client::legacy::Error>() {
if let Some(source) = h_err.source() {
return Self::try_infer_from(source);
}
Expand Down
2 changes: 1 addition & 1 deletion orion-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ mod listeners;
mod secrets;
pub(crate) mod thread_local;
mod transport;
pub(crate) mod utils;
mod utils;

use std::sync::OnceLock;

Expand Down
6 changes: 3 additions & 3 deletions orion-lib/src/listeners/filterchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ use crate::{
listeners::{filter_state::DownstreamConnectionMetadata, http_connection_manager::ExtendedRequest},
secrets::{TlsConfigurator, WantsToBuildServer},
transport::AsyncReadWrite,
utils::tokio::TokioExecutor,
AsyncStream, ConversionContext, Error, Result,
};
use compact_str::CompactString;
use futures::TryFutureExt;
use hyper::{service::Service, Request};
use hyper_util::rt::{TokioExecutor, TokioIo};
use hyper_util::server::conn::auto::Builder as HyperServerBuilder;
use opentelemetry::KeyValue;
use orion_client::{rt::TokioIo, server::conn::auto::Builder as HyperServerBuilder};
use orion_configuration::config::{
listener::{FilterChain as FilterChainConfig, MainFilter},
network_filters::{
Expand Down Expand Up @@ -223,7 +223,7 @@ impl FilterchainType {
};

debug!("{listener_name} tried to negotiate {codec_type:?}, got {selected_codec:?}");
let mut hyper_server = HyperServerBuilder::new(TokioExecutor);
let mut hyper_server = HyperServerBuilder::new(TokioExecutor::new());
let stream = TokioIo::new(stream);
//todo(hayley): we should be applying listener http settings here
hyper_server = match selected_codec {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use orion_format::types::ResponseFlags as FmtResponseFlags;

use http::{header, HeaderMap, HeaderValue, StatusCode, Version};
use hyper::{Request, Response};
use orion_client::rt::TokioIo;
use hyper_util::rt::TokioIo;
use orion_configuration::config::network_filters::http_connection_manager::UpgradeType;
use tokio::io::copy_bidirectional;
use tracing::error;
Expand Down
2 changes: 1 addition & 1 deletion orion-lib/src/transport/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::{

use http::uri::Authority;
use hyper::Uri;
use orion_client::rt::TokioIo;
use hyper_util::rt::TokioIo;
use orion_error::{Context, WithContext};
use orion_format::types::ResponseFlags;
use pingora_timeout::fast_timeout::fast_timeout;
Expand Down
143 changes: 69 additions & 74 deletions orion-lib/src/transport/http_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use crate::{
},
secrets::{TlsConfigurator, WantsToBuildClient},
thread_local::{LocalBuilder, LocalObject},
utils::tokio::TokioExecutor,
Error, PolyBody, Result,
};
use http::{
Expand All @@ -42,11 +41,11 @@ use http::{
use http_body_util::BodyExt;
use hyper::{body::Incoming, Request, Uri};
use hyper_rustls::{FixedServerNameResolver, HttpsConnector};
use opentelemetry::KeyValue;
use orion_client::{
use hyper_util::{
client::legacy::{connect::Connect, Builder, Client},
rt::tokio::TokioTimer,
rt::tokio::{TokioExecutor, TokioTimer},
};
use opentelemetry::KeyValue;
use orion_configuration::config::{
cluster::http_protocol_options::{Codec, HttpProtocolOptions},
network_filters::http_connection_manager::RetryPolicy,
Expand All @@ -71,10 +70,11 @@ use webpki::types::ServerName;

#[cfg(feature = "metrics")]
use {
orion_client::client::legacy::pool::{ConnectionEvent, EventHandler, Tag},
orion_client::client::legacy::PoolKey,
hyper_util::client::legacy::pool::{ConnectionEvent, EventHandler, Tag},
hyper_util::client::legacy::PoolKey,
std::any::Any,
};
const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(30);

type IncomingResult = (std::result::Result<Response<Incoming>, Error>, Duration);

Expand Down Expand Up @@ -118,6 +118,7 @@ impl HttpChannelClient {
}
}

#[derive(Default)]
pub struct HttpChannelBuilder {
tls: Option<TlsConfigurator<ClientConfig, WantsToBuildClient>>,
authority: Option<Authority>,
Expand All @@ -142,15 +143,7 @@ impl LocalBuilder<HttpsConnector<LocalConnectorWithDNSResolver>, Arc<HttpsClient

impl HttpChannelBuilder {
pub fn new(bind_device: Option<BindDevice>) -> Self {
Self {
tls: None,
authority: None,
cluster_name: None,
bind_device,
http_protocol_options: HttpProtocolOptions::default(),
server_name: None,
connection_timeout: None,
}
Self { bind_device, ..Default::default() }
}

pub fn with_tls(self, tls_configurator: Option<TlsConfigurator<ClientConfig, WantsToBuildClient>>) -> Self {
Expand Down Expand Up @@ -179,96 +172,98 @@ impl HttpChannelBuilder {

#[allow(clippy::cast_sign_loss)]
pub fn build(self) -> crate::Result<HttpChannel> {
let authority = self.authority.clone().ok_or("Authority is mandatory")?;
let mut client_builder = Client::builder(TokioExecutor);

let _cluster_name = self.cluster_name.unwrap_or_default();

client_builder.timer(TokioTimer::new()); // note: legacy client builder is not persistent struct (&mut Self -> &mut Self)
client_builder
// Set an optional timeout for idle sockets being kept-alive. A Timer is required for this to take effect.
.pool_idle_timeout(
self.http_protocol_options.common.idle_timeout.unwrap_or(std::time::Duration::from_secs(30)),
)
// Pass a timer for the timeout...
.pool_timer(TokioTimer::new())
.pool_max_idle_per_host(usize::MAX)
.set_host(false);

let configured_upstream_http_version = self.http_protocol_options.codec;

if matches!(configured_upstream_http_version, Codec::Http2) {
client_builder.http2_only(true);
let http2_options = self.http_protocol_options.http2_options;
if let Some(settings) = &http2_options.keep_alive_settings {
client_builder.http2_keep_alive_interval(settings.keep_alive_interval);
if let Some(timeout) = settings.keep_alive_timeout {
client_builder.http2_keep_alive_timeout(timeout);
}
client_builder.http2_keep_alive_while_idle(true);
}
client_builder.http2_initial_connection_window_size(http2_options.initial_connection_window_size());
client_builder.http2_initial_stream_window_size(http2_options.initial_stream_window_size());
//fixme(hayley): this is not max_concurrent_streams! this is reset streams
if let Some(max) = http2_options.max_concurrent_streams() {
client_builder.http2_max_concurrent_reset_streams(max);
}
}

#[cfg(feature = "metrics")]
client_builder.event_handler(EventHandler::new(update_upstream_stats, _cluster_name));
let authority = self.authority.clone().ok_or_else(|| Error::from("Authority is mandatory"))?;
let client_builder = self.configure_hyper_client();

if let Some(tls_context) = self.tls {
let builder = hyper_rustls::HttpsConnectorBuilder::new();
let builder = builder.with_tls_config(tls_context.into_inner());
let builder = builder.https_or_http();
let builder = if let Some(server_name) = self.server_name {
// Build TLS client inline to avoid ownership issues
let mut builder =
hyper_rustls::HttpsConnectorBuilder::new().with_tls_config(tls_context.into_inner()).https_or_http();

builder = if let Some(server_name) = self.server_name {
builder.with_server_name_resolver(FixedServerNameResolver::new(server_name))
} else {
let server_name = ServerName::try_from(authority.host().to_owned())?;
debug!("Server name is not configured in bootstrap.. using endpoint authority {:?}", server_name);
Copy link

Copilot AI Aug 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed typo: 'boostrap' should be 'bootstrap'

Suggested change
debug!("Server name is not configured in bootstrap.. using endpoint authority {:?}", server_name);
debug!("Server name is not configured.. using endpoint authority {:?}", server_name);

Copilot uses AI. Check for mistakes.
builder.with_server_name_resolver(FixedServerNameResolver::new(server_name))
};

let connector = LocalConnectorWithDNSResolver {
addr: authority.clone(),
cluster_name: self.cluster_name.unwrap_or_default(),
bind_device: self.bind_device,
timeout: self.connection_timeout,
};

let tls_connector = match self.http_protocol_options.codec {
Codec::Http2 => builder.enable_http2().wrap_connector(LocalConnectorWithDNSResolver {
addr: authority.clone(),
cluster_name: self.cluster_name.unwrap_or_default(),
bind_device: self.bind_device,
timeout: self.connection_timeout,
}),

Codec::Http1 => builder.enable_http1().wrap_connector(LocalConnectorWithDNSResolver {
addr: authority.clone(),
cluster_name: self.cluster_name.unwrap_or_default(),
bind_device: self.bind_device,
timeout: self.connection_timeout,
}),
Codec::Http2 => builder.enable_http2().wrap_connector(connector),
Codec::Http1 => builder.enable_http1().wrap_connector(connector),
};

Ok(HttpChannel {
client: HttpChannelClient::Tls(ClientContext::new(
configured_upstream_http_version,
self.http_protocol_options.codec,
Arc::new(LocalObject::new(client_builder, tls_connector)),
)),
http_version: configured_upstream_http_version,
http_version: self.http_protocol_options.codec,
upstream_authority: authority,
cluster_name: self.cluster_name.unwrap_or_default(),
})
} else {
// Build plain client inline
let connector = LocalConnectorWithDNSResolver {
addr: authority.clone(),
cluster_name: self.cluster_name.unwrap_or_default(),
bind_device: self.bind_device,
timeout: self.connection_timeout,
cluster_name: self.cluster_name.unwrap_or_default(),
};

Ok(HttpChannel {
client: HttpChannelClient::Plain(Arc::new(LocalObject::new(client_builder, connector))),
http_version: configured_upstream_http_version,
http_version: self.http_protocol_options.codec,
upstream_authority: authority,
cluster_name: self.cluster_name.unwrap_or_default(),
})
}
}

fn configure_hyper_client(&self) -> Builder {
let mut client_builder = Client::builder(TokioExecutor::new());
client_builder
.timer(TokioTimer::new())
.pool_idle_timeout(self.http_protocol_options.common.idle_timeout.unwrap_or(DEFAULT_IDLE_TIMEOUT))
.pool_timer(TokioTimer::new())
.pool_max_idle_per_host(usize::MAX)
.set_host(false);

let configured_upstream_http_version = self.http_protocol_options.codec;

self.configure_http2_if_needed(&mut client_builder, configured_upstream_http_version);

client_builder
}

fn configure_http2_if_needed(&self, client_builder: &mut Builder, version: Codec) {
if matches!(version, Codec::Http2) {
client_builder.http2_only(true);
let http2_options = &self.http_protocol_options.http2_options;

if let Some(settings) = &http2_options.keep_alive_settings {
client_builder.http2_keep_alive_interval(settings.keep_alive_interval);
if let Some(timeout) = settings.keep_alive_timeout {
client_builder.http2_keep_alive_timeout(timeout);
}
client_builder.http2_keep_alive_while_idle(true);
}

client_builder.http2_initial_connection_window_size(http2_options.initial_connection_window_size());
client_builder.http2_initial_stream_window_size(http2_options.initial_stream_window_size());

if let Some(max) = http2_options.max_concurrent_streams() {
client_builder.http2_max_concurrent_reset_streams(max);
}
Comment on lines +262 to +264
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The configuration option http2_options.max_concurrent_streams() is being used to set http2_max_concurrent_reset_streams. These are two distinct HTTP/2 settings. max_concurrent_streams controls the maximum number of concurrent streams, while max_concurrent_reset_streams limits how many streams can be reset by the client, as a denial-of-service protection.

Using the value for one to configure the other is a bug and will lead to unexpected behavior. The fixme comment that pointed this out was removed in this refactoring, but the issue remains.

To fix this, the field in Http2ProtocolOptions should be renamed to max_concurrent_reset_streams to accurately reflect what is being configured. Since that file is not part of this PR, you should at least restore the //fixme comment to track this issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignore

}
}
}

#[cfg(feature = "metrics")]
Expand Down
1 change: 0 additions & 1 deletion orion-lib/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,3 @@

pub mod http;
pub mod rewindable_stream;
pub mod tokio;
35 changes: 0 additions & 35 deletions orion-lib/src/utils/tokio.rs

This file was deleted.

Loading