From e591e54c0b56bd599e286fe46723bdb36b608c79 Mon Sep 17 00:00:00 2001 From: Oriol Arcas Date: Mon, 16 Dec 2024 10:57:51 +0000 Subject: [PATCH 01/64] Start logging before loading config Signed-off-by: Oriol Arcas --- orion-proxy/src/lib.rs | 156 ++++++++++++++++++++++++++++++++--------- 1 file changed, 123 insertions(+), 33 deletions(-) diff --git a/orion-proxy/src/lib.rs b/orion-proxy/src/lib.rs index 565a4252..f9051244 100644 --- a/orion-proxy/src/lib.rs +++ b/orion-proxy/src/lib.rs @@ -18,13 +18,8 @@ // // -use orion_configuration::{ - config::{Config, Log as LogConf}, - options::Options, -}; +use orion_configuration::{config::Config, options::Options}; use orion_lib::{Result, RUNTIME_CONFIG}; -use tracing_appender::non_blocking::WorkerGuard; -use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Registry}; #[macro_use] mod core_affinity; @@ -33,37 +28,104 @@ mod runtime; mod xds_configurator; pub fn run() -> Result<()> { + let mut tracing_manager = proxy_tracing::TracingManager::new(); + let options = Options::parse_options(); let Config { runtime, logging, bootstrap } = Config::new(&options)?; RUNTIME_CONFIG.set(runtime).map_err(|_| "runtime config was somehow set before we had a chance to set it")?; - let _guard = init_tracing(logging); + + tracing_manager.update(logging)?; + #[cfg(target_os = "linux")] if !(caps::has_cap(None, caps::CapSet::Permitted, caps::Capability::CAP_NET_RAW)?) { tracing::warn!("CAP_NET_RAW is NOT available, SO_BINDTODEVICE will not work"); } + proxy::run_proxy(bootstrap) } -#[cfg(feature = "console")] -use console_subscriber; +#[cfg(not(feature = "console"))] +mod proxy_tracing { + use tracing_appender::non_blocking::{NonBlocking, WorkerGuard}; + use tracing_subscriber::fmt::format::{DefaultFields, Format}; + use tracing_subscriber::layer::Layered; + use tracing_subscriber::{fmt, reload}; + use tracing_subscriber::{reload::Handle, EnvFilter, Registry}; -#[cfg(feature = "console")] -fn init_tracing(_conf: LogConf) -> WorkerGuard { - let (_non_blocking, guard) = tracing_appender::non_blocking(std::io::stdout()); - console_subscriber::init(); - guard -} + use orion_configuration::config::Log as LogConf; + use orion_lib::Result; -#[cfg(not(feature = "console"))] -fn init_tracing(log_conf: LogConf) -> WorkerGuard { - let env_filter = EnvFilter::try_from_default_env().ok().or(log_conf.log_level).unwrap_or_else(|| { - EnvFilter::builder() - .with_default_directive(tracing_subscriber::filter::LevelFilter::ERROR.into()) - .parse_lossy("") - }); - - match log_conf.log_file.as_ref() { - None => { + type RegistryLayer = + fmt::Layer, Registry>, DefaultFields, Format, NonBlocking>; + type FilterReloadHandle = Handle; + type LayerReloadHandle = Handle< + fmt::Layer, Registry>, DefaultFields, Format, NonBlocking>, + Layered, Registry>, + >; + + pub struct TracingManager { + guard: WorkerGuard, + layer_reload_handle: LayerReloadHandle, + filter_reload_handle: FilterReloadHandle, + } + + impl TracingManager { + pub fn new() -> Self { + let level = EnvFilter::builder() + .with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into()) + .parse_lossy(""); + let (guard, layer_reload_handle, filter_reload_handle) = Self::init_tracing(Registry::default(), level); + TracingManager { guard, filter_reload_handle, layer_reload_handle } + } + + pub fn update(&mut self, log_conf: LogConf) -> Result<()> { + // Update log level + self.filter_reload_handle.modify(|filter| { + *filter = EnvFilter::try_from_default_env().ok().or(log_conf.log_level).unwrap_or_else(|| { + EnvFilter::builder() + .with_default_directive(tracing_subscriber::filter::LevelFilter::ERROR.into()) + .parse_lossy("") + }); + })?; + + // Update tracing layer if necessary (stdout -> file) + if let Some(log_file) = log_conf.log_file { + self.layer_reload_handle.modify(|layer| { + let (new_guard, new_layer) = Self::file_layer(&log_file, log_conf.log_directory); + *layer = new_layer; + self.guard = new_guard; + })?; + } + + Ok(()) + } + + #[cfg(not(feature = "console"))] + fn init_tracing( + registry: Registry, + log_level: EnvFilter, + ) -> (WorkerGuard, LayerReloadHandle, FilterReloadHandle) { + use tracing_subscriber::layer::SubscriberExt as _; + use tracing_subscriber::util::SubscriberInitExt as _; + + let env_filter = EnvFilter::try_from_default_env().ok().or(Some(log_level)).unwrap_or_else(|| { + EnvFilter::builder() + .with_default_directive(tracing_subscriber::filter::LevelFilter::ERROR.into()) + .parse_lossy("") + }); + + // Start as an stdout layer by default, after reading the configuration this can be upgraded to a file layer + let (guard, layer) = Self::stdout_layer(); + let (layer, layer_reload_handle) = reload::Layer::new(layer); + + let (env_filter, filter_reload_handle) = reload::Layer::new(env_filter); + + registry.with(env_filter).with(layer).init(); + (guard, layer_reload_handle, filter_reload_handle) + } + + #[cfg(not(feature = "console"))] + fn stdout_layer() -> (WorkerGuard, RegistryLayer) { let out = std::io::stdout(); let is_terminal = std::io::IsTerminal::is_terminal(&out); let (non_blocking, guard) = tracing_appender::non_blocking(out); @@ -73,16 +135,44 @@ fn init_tracing(log_conf: LogConf) -> WorkerGuard { std_layer = std_layer.with_ansi(false); } - Registry::default().with(env_filter).with(std_layer).init(); - guard - }, - Some(filename) => { + (guard, std_layer) + } + + #[cfg(not(feature = "console"))] + fn file_layer(filename: &str, log_directory: Option) -> (WorkerGuard, RegistryLayer) { let file_appender = - tracing_appender::rolling::hourly(log_conf.log_directory.as_ref().unwrap_or(&".".into()), filename); + tracing_appender::rolling::hourly(log_directory.as_ref().unwrap_or(&".".into()), filename); let (non_blocking, guard) = tracing_appender::non_blocking(file_appender); let file_layer = fmt::layer().with_ansi(false).with_writer(non_blocking).with_thread_names(true); - Registry::default().with(env_filter).with(file_layer).init(); - guard - }, + + (guard, file_layer) + } + } +} + +#[cfg(feature = "console")] +mod proxy_tracing { + #[cfg(feature = "console")] + use console_subscriber; + + use tracing_appender::non_blocking::WorkerGuard; + + use orion_configuration::config::Log as LogConf; + use orion_lib::Result; + + pub struct TracingManager { + _guard: WorkerGuard, + } + + impl TracingManager { + pub fn new() -> Self { + let (_non_blocking, guard) = tracing_appender::non_blocking(std::io::stdout()); + console_subscriber::init(); + TracingManager { _guard: guard } + } + + pub fn update(&mut self, _log_conf: LogConf) -> Result<()> { + Ok(()) + } } } From fd48dc5a28ceb04ee4c371757072759353eb8af0 Mon Sep 17 00:00:00 2001 From: Rui Ferreira Date: Thu, 6 Mar 2025 22:24:41 +0000 Subject: [PATCH 02/64] Clear backlog of clippy warnings Signed-off-by: Rui Ferreira --- Cargo.toml | 60 ++++++------------- clippy.toml | 2 - orion-configuration/src/config.rs | 11 ++-- orion-configuration/src/config/cluster.rs | 5 +- .../src/config/cluster/health_check.rs | 4 ++ .../config/cluster/http_protocol_options.rs | 6 +- orion-configuration/src/config/core.rs | 6 +- orion-configuration/src/config/listener.rs | 3 +- orion-configuration/src/config/log.rs | 1 + .../http_connection_manager.rs | 6 +- .../http_connection_manager/header_matcher.rs | 2 +- .../http_connection_manager/route.rs | 3 +- orion-configuration/src/config/runtime.rs | 1 + orion-configuration/src/config/transport.rs | 1 + orion-configuration/src/lib.rs | 3 +- orion-lib/src/body/timeout_body.rs | 8 ++- orion-lib/src/clusters/balancers/least.rs | 1 + orion-lib/src/clusters/balancers/random.rs | 1 + orion-lib/src/clusters/cached_watch.rs | 2 +- .../src/clusters/health/checkers/grpc/mod.rs | 6 +- orion-lib/src/clusters/load_assignment.rs | 14 ++++- orion-lib/src/clusters/retry_policy.rs | 2 +- orion-lib/src/configuration/mod.rs | 2 +- orion-lib/src/lib.rs | 1 + orion-lib/src/listeners/filterchain.rs | 2 +- .../src/listeners/http_connection_manager.rs | 11 ++-- .../http_connection_manager/redirect.rs | 2 +- orion-lib/src/listeners/listener.rs | 17 +++--- orion-lib/src/listeners/rate_limiter/mod.rs | 16 +++-- .../src/listeners/synthetic_http_response.rs | 4 +- orion-lib/src/secrets/secrets_manager.rs | 2 +- orion-lib/src/transport/bind_device.rs | 5 +- orion-lib/src/transport/http_channel.rs | 12 ++-- orion-lib/src/transport/tls_inspector.rs | 4 +- orion-proxy/src/core_affinity.rs | 1 + orion-proxy/src/lib.rs | 7 +-- orion-proxy/src/proxy.rs | 4 +- orion-proxy/src/xds_configurator.rs | 2 +- orion-proxy/tests/configs.rs | 1 + orion-xds/Cargo.toml | 21 +------ orion-xds/examples/server_routes_and_loads.rs | 1 + orion-xds/src/xds/bindings.rs | 1 + orion-xds/src/xds/resources/mod.rs | 2 +- 43 files changed, 122 insertions(+), 144 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ff844bc1..32601b24 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,49 +74,23 @@ debug = 1 inherits = "release" [workspace.lints.clippy] -correctness = { level = "deny", priority = 10 } -disallowed-methods = "deny" -expect_used = "warn" # FIXME this should be deny -panic = "warn" # FIXME this should be deny -todo = "deny" -transmute_ptr_to_ptr = "deny" +pedantic = { level = "warn", priority = -1 } +disallowed-methods = "deny" +expect_used = "warn" # FIXME this should be deny +correctness = { level = "deny", priority = 10 } +panic = "warn" # FIXME this should be deny +todo = "deny" +transmute_ptr_to_ptr = "deny" unchecked_duration_subtraction = "deny" -unused_async = "deny" -unwrap_used = "deny" - -# A subset of pedantic as warn -cast_lossless = "warn" -cast_possible_truncation = "warn" -cast_possible_wrap = "warn" -cast_precision_loss = "warn" -cast_ptr_alignment = "warn" -cast_sign_loss = "warn" -checked_conversions = "warn" -if_then_some_else_none = "warn" -inconsistent_struct_constructor = "allow" -invalid_upcast_comparisons = "warn" -large_futures = "warn" -large_stack_arrays = "warn" -large_types_passed_by_value = "warn" +unused_async = "deny" +unwrap_used = "deny" +uninlined_format_args = "deny" +len_without_is_empty = "deny" +trivially_copy_pass_by_ref = "allow" map_unwrap_or = "allow" -maybe_infinite_iter = "warn" -missing_errors_doc = "allow" -missing_panics_doc = "allow" -doc_markdown = "allow" -module_name_repetitions = "allow" -must_use_candidate = "allow" -needless_raw_string_hashes = "allow" -pedantic = { level = "warn", priority = -1 } -print_stderr = "warn" -print_stdout = "warn" -ptr_cast_constness = "warn" -range_minus_one = "warn" -range_plus_one = "warn" -redundant_closure_for_method_calls = "warn" -return_self_not_must_use = "warn" -same_functions_in_if_condition = "warn" +into_iter_on_ref = "allow" +inconsistent_struct_constructor = "allow" semicolon-if-nothing-returned = "allow" -similar_names = "warn" -str_to_string = "warn" -string_to_string = "warn" -unicode_not_nfc = "warn" +missing_errors_doc = "allow" +must_use_candidate = "allow" +doc_markdown = "allow" \ No newline at end of file diff --git a/clippy.toml b/clippy.toml index 106a4e09..dde7ef14 100644 --- a/clippy.toml +++ b/clippy.toml @@ -6,8 +6,6 @@ disallowed-methods = [ { path = "tokio::time::timeout", reason = "Use pingora_timeout::fast_timeout::fast_timeout instead" }, ] -# The maximum number of function parameters is 5. -too-many-arguments-threshold = 5 allow-expect-in-tests = true allow-print-in-tests = true diff --git a/orion-configuration/src/config.rs b/orion-configuration/src/config.rs index d3cc2c83..efaa15e9 100644 --- a/orion-configuration/src/config.rs +++ b/orion-configuration/src/config.rs @@ -122,7 +122,7 @@ mod envoy_conversions { serde_yaml::Deserializer::from_reader(&envoy_file), &mut track, )) - .with_context(|| format!("failed to deserialize {}", track.path().to_string()))?; + .with_context(|| format!("failed to deserialize {}", track.path()))?; Bootstrap::try_from(envoy).context("failed to convert into orion bootstrap") })() .with_context(|| format!("failed to read config from \"{}\"", envoy_path.as_ref().display())) @@ -137,7 +137,7 @@ mod envoy_conversions { Self { runtime: Runtime::default(), logging: Log::default(), bootstrap } }, (Some(config), maybe_override) => { - let ShimConfig { runtime, logging, bootstrap, envoy_bootstrap } = deserialize_yaml(&config) + let ShimConfig { runtime, logging, bootstrap, envoy_bootstrap } = deserialize_yaml(config) .with_context(|| format!("failed to deserialize \"{}\"", config.display()))?; let mut bootstrap = match (bootstrap, envoy_bootstrap) { (None, None) => Bootstrap::default(), @@ -149,7 +149,7 @@ mod envoy_conversions { }, }; if let Some(bootstrap_override) = maybe_override { - bootstrap = bootstrap_from_path_to_envoy_bootstrap(&bootstrap_override)?; + bootstrap = bootstrap_from_path_to_envoy_bootstrap(bootstrap_override)?; } Self { runtime, logging, bootstrap } }, @@ -199,10 +199,7 @@ mod envoy_conversions { // std::fs::write(new_path, serialized.as_bytes())?; // } let deserialized: Config = serde_yaml::from_str(&serialized)?; - if new_conf != deserialized { - tracing::info!("\n{}\n", serde_yaml::to_string(&deserialized)?); - panic!("failed to roundtrip config transcoding") - } + assert_eq!(new_conf, deserialized, "failed to roundtrip config transcoding"); } else { tracing::info!("skipping {}", path.display()) } diff --git a/orion-configuration/src/config/cluster.rs b/orion-configuration/src/config/cluster.rs index 25782566..c850f093 100644 --- a/orion-configuration/src/config/cluster.rs +++ b/orion-configuration/src/config/cluster.rs @@ -497,10 +497,9 @@ mod envoy_conversions { Ok(Self { endpoints }) })(); if !cluster_name.is_empty() { - ret.with_name(cluster_name) - } else { - ret + return ret.with_name(cluster_name); } + ret } } diff --git a/orion-configuration/src/config/cluster/health_check.rs b/orion-configuration/src/config/cluster/health_check.rs index 1ce6ce5d..5e27cd34 100644 --- a/orion-configuration/src/config/cluster/health_check.rs +++ b/orion-configuration/src/config/cluster/health_check.rs @@ -197,6 +197,7 @@ pub struct HttpHealthCheck { pub path: Option, } +#[allow(clippy::single_range_in_vec_init)] fn default_expected_statuses() -> Vec> { vec![200..201] } @@ -206,6 +207,7 @@ fn is_default_expected_statuses(value: &Vec>) -> bool { } impl Default for HttpHealthCheck { + #[allow(clippy::single_range_in_vec_init)] fn default() -> Self { Self { http_version: Codec::Http1, @@ -349,6 +351,7 @@ mod envoy_conversions { .with_node("interval_jitter") })?; let interval_jitter_percent = { + #[allow(clippy::cast_precision_loss)] let as_float = interval_jitter_percent as f32; if !as_float.is_finite() || as_float > 1.0 || as_float.is_sign_negative() { Err(GenericError::from_msg(format!( @@ -431,6 +434,7 @@ mod envoy_conversions { } else if start < 100 || end >= 600 { Err(GenericError::from_msg("invalid range [{start},{end}). Range has to be within [100,600).")) } else { + #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] Ok((start as u16)..(end as u16)) } }) diff --git a/orion-configuration/src/config/cluster/http_protocol_options.rs b/orion-configuration/src/config/cluster/http_protocol_options.rs index 1942d4cd..d2d21451 100644 --- a/orion-configuration/src/config/cluster/http_protocol_options.rs +++ b/orion-configuration/src/config/cluster/http_protocol_options.rs @@ -78,7 +78,7 @@ enum ExplicitProtocolOptions { impl Default for ExplicitProtocolOptions { fn default() -> Self { - Self::Http1(Http1ProtocolOptions::default()) + Self::Http1(Http1ProtocolOptions) } } @@ -239,10 +239,10 @@ mod envoy_conversions { let common = common_http_protocol_options.map(CommonHttpOptions::try_from).transpose()?.unwrap_or_default(); let (codec, http1_options, http2_options) = match upstream_protocol_options { UpstreamHttpProtocolOptions::Explicit(ExplicitProtocolOptions::Http1(http1)) => { - (Codec::Http1, http1, Default::default()) + (Codec::Http1, http1, Http2ProtocolOptions::default()) }, UpstreamHttpProtocolOptions::Explicit(ExplicitProtocolOptions::Http2(http2)) => { - (Codec::Http2, Default::default(), http2) + (Codec::Http2, Http1ProtocolOptions, http2) }, }; diff --git a/orion-configuration/src/config/core.rs b/orion-configuration/src/config/core.rs index 72e6db6c..27b17e5d 100644 --- a/orion-configuration/src/config/core.rs +++ b/orion-configuration/src/config/core.rs @@ -88,7 +88,7 @@ impl<'a> DataSourceReader<'a> { } } -impl<'a> Read for DataSourceReader<'a> { +impl Read for DataSourceReader<'_> { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { match self { Self::OwnedBytes { bytes, read } => { @@ -105,7 +105,7 @@ impl<'a> Read for DataSourceReader<'a> { } } -impl<'a> BufRead for DataSourceReader<'a> { +impl BufRead for DataSourceReader<'_> { fn fill_buf(&mut self) -> std::io::Result<&[u8]> { match self { Self::OwnedBytes { bytes, read } => Ok(&bytes[*read..]), @@ -134,7 +134,7 @@ pub struct StringMatcher { } pub(crate) struct CaseSensitive<'a>(pub bool, pub &'a str); -impl<'a> CaseSensitive<'a> { +impl CaseSensitive<'_> { #[inline] pub fn equals(&self, b: &str) -> bool { if self.0 { diff --git a/orion-configuration/src/config/listener.rs b/orion-configuration/src/config/listener.rs index b8b9d16e..b10af2c7 100644 --- a/orion-configuration/src/config/listener.rs +++ b/orion-configuration/src/config/listener.rs @@ -178,7 +178,7 @@ impl FilterChainMatch { let bits_matched = match ip { IpAddr::V4(_) => 32, IpAddr::V6(_) => 128, - } - (range.prefix_len() as u32); + } - u32::from(range.prefix_len()); MatchResult::Matched(bits_matched) } else { MatchResult::FailedMatch @@ -593,6 +593,7 @@ mod envoy_conversions { } } + #[allow(clippy::large_enum_variant)] #[derive(Debug, Clone)] enum SupportedEnvoyFilter { HttpConnectionManager(EnvoyHttpConnectionManager), diff --git a/orion-configuration/src/config/log.rs b/orion-configuration/src/config/log.rs index 3ca03f81..5cfa9a43 100644 --- a/orion-configuration/src/config/log.rs +++ b/orion-configuration/src/config/log.rs @@ -54,6 +54,7 @@ where }) } +#[allow(clippy::ref_option)] fn serialize_log_level( value: &Option, serializer: S, diff --git a/orion-configuration/src/config/network_filters/http_connection_manager.rs b/orion-configuration/src/config/network_filters/http_connection_manager.rs index 5221c8e8..66d3743d 100644 --- a/orion-configuration/src/config/network_filters/http_connection_manager.rs +++ b/orion-configuration/src/config/network_filters/http_connection_manager.rs @@ -777,13 +777,13 @@ mod envoy_conversions { } } + // In the original Protobuf specification, this enum is `oneof` rds, route_config or scoped_routes, + // this is why the name of the field is manually added in case an error happens. impl TryFrom> for RouteSpecifier { type Error = GenericError; fn try_from(envoy: Option) -> Result { Ok(match envoy { - Some(EnvoyRouteSpecifier::Rds(rds)) => { - Self::Rds(RdsSpecifier::try_from(rds).map_err(|e| e.with_node("rds"))?) - }, + Some(EnvoyRouteSpecifier::Rds(rds)) => Self::Rds(RdsSpecifier::try_from(rds).with_node("rds")?), Some(EnvoyRouteSpecifier::RouteConfig(envoy)) => { Self::RouteConfig(envoy.try_into().with_node("route_config")?) }, diff --git a/orion-configuration/src/config/network_filters/http_connection_manager/header_matcher.rs b/orion-configuration/src/config/network_filters/http_connection_manager/header_matcher.rs index ff554dc5..0fd257b3 100644 --- a/orion-configuration/src/config/network_filters/http_connection_manager/header_matcher.rs +++ b/orion-configuration/src/config/network_filters/http_connection_manager/header_matcher.rs @@ -52,7 +52,7 @@ impl<'de> Deserialize<'de> for HeaderNames { D: serde::Deserializer<'de>, { struct StrVisitor; - impl<'de> Visitor<'de> for StrVisitor { + impl Visitor<'_> for StrVisitor { type Value = HeaderNames; fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { formatter.write_str("`str`") diff --git a/orion-configuration/src/config/network_filters/http_connection_manager/route.rs b/orion-configuration/src/config/network_filters/http_connection_manager/route.rs index af656487..c56ff027 100644 --- a/orion-configuration/src/config/network_filters/http_connection_manager/route.rs +++ b/orion-configuration/src/config/network_filters/http_connection_manager/route.rs @@ -87,7 +87,7 @@ pub enum PathRewriteSpecifier { impl PathRewriteSpecifier { /// will preserve the query part of the input if the replacement does not contain one - #[must_use] + #[allow(clippy::redundant_else)] pub fn apply( &self, path_and_query: Option<&PathAndQuery>, @@ -255,6 +255,7 @@ const DEFAULT_TIMEOUT: Duration = Duration::from_secs(15); const fn default_timeout_deser() -> Option { Some(DEFAULT_TIMEOUT) } +#[allow(clippy::ref_option)] fn is_default_timeout(timeout: &Option) -> bool { *timeout == default_timeout_deser() } diff --git a/orion-configuration/src/config/runtime.rs b/orion-configuration/src/config/runtime.rs index a38233ea..7f280dbc 100644 --- a/orion-configuration/src/config/runtime.rs +++ b/orion-configuration/src/config/runtime.rs @@ -96,6 +96,7 @@ impl Runtime { } } +#[allow(clippy::expect_used)] pub(crate) fn non_zero_num_cpus() -> NonZeroUsize { NonZeroUsize::try_from(num_cpus::get()).expect("found zero cpus") } diff --git a/orion-configuration/src/config/transport.rs b/orion-configuration/src/config/transport.rs index 06e5ccc4..f4e39f35 100644 --- a/orion-configuration/src/config/transport.rs +++ b/orion-configuration/src/config/transport.rs @@ -447,6 +447,7 @@ mod envoy_conversions { } } + #[allow(clippy::large_enum_variant)] pub(crate) enum SupportedEnvoyTransportSocket { DownstreamTlsContext(EnvoyDownstreamTlsContext), UpstreamTlsContext(EnvoyUpstreamTlsContext), diff --git a/orion-configuration/src/lib.rs b/orion-configuration/src/lib.rs index 3182a3a4..e6dfc34c 100644 --- a/orion-configuration/src/lib.rs +++ b/orion-configuration/src/lib.rs @@ -22,6 +22,7 @@ use orion_error::Error; pub(crate) type BoxedError = Box; +#[allow(clippy::wildcard_imports, clippy::too_many_lines)] pub mod config; pub mod options; @@ -74,7 +75,7 @@ mod tests { name: "http-router".to_owned(), config_type: Some(ConfigType::TypedConfig(Any { type_url: "type.googleapis.com/envoy.extensions.filters.http.router.v3.Router".to_owned(), - value: vec![].into(), + value: vec![], })), ..Default::default() }], diff --git a/orion-lib/src/body/timeout_body.rs b/orion-lib/src/body/timeout_body.rs index 7b601233..0bb1e4c3 100644 --- a/orion-lib/src/body/timeout_body.rs +++ b/orion-lib/src/body/timeout_body.rs @@ -32,7 +32,7 @@ use http_body::Body; use pin_project::pin_project; use pingora_timeout::fast_timeout::{fast_timeout, FastTimeout}; -use pingora_timeout::Timeout; +use pingora_timeout::Timeout as PingoraTimeout; use std::{ future::{pending, Future, Pending}, pin::Pin, @@ -40,11 +40,13 @@ use std::{ time::Duration, }; +pub type Timeout = PingoraTimeout, FastTimeout>; + #[pin_project] pub struct TimeoutBody { timeout: Option, #[pin] - sleep: Option, FastTimeout>>>>, + sleep: Option>>, #[pin] body: B, } @@ -78,7 +80,7 @@ where }; // Error if the timeout has expired. - if let Poll::Ready(_) = sleep_pinned.poll(cx) { + if sleep_pinned.poll(cx).is_ready() { return Poll::Ready(Some(Err(TimeoutBodyError::TimedOut))); } diff --git a/orion-lib/src/clusters/balancers/least.rs b/orion-lib/src/clusters/balancers/least.rs index 04fc2dc4..3c96e8f3 100644 --- a/orion-lib/src/clusters/balancers/least.rs +++ b/orion-lib/src/clusters/balancers/least.rs @@ -69,6 +69,7 @@ impl WeightedLeastRequestBalancer { Self::new_with_settings(items, DEFAULT_ACTIVE_REQUEST_BIAS, u32::from(DEFAULT_P2C_CHOICE_COUNT)) } + #[allow(clippy::expect_used)] fn new_with_settings( items: impl IntoIterator>, active_request_bias: f32, diff --git a/orion-lib/src/clusters/balancers/random.rs b/orion-lib/src/clusters/balancers/random.rs index 2e51ab58..ad902527 100644 --- a/orion-lib/src/clusters/balancers/random.rs +++ b/orion-lib/src/clusters/balancers/random.rs @@ -36,6 +36,7 @@ pub struct RandomBalancer { } impl RandomBalancer { + #[allow(clippy::expect_used)] pub fn new(items: impl IntoIterator>) -> Self { let rng = SmallRng::from_rng(rand::thread_rng()).expect("RNG must be valid"); RandomBalancer::new_with_rng(items, rng) diff --git a/orion-lib/src/clusters/cached_watch.rs b/orion-lib/src/clusters/cached_watch.rs index 0b1a80f7..ba1fb82b 100644 --- a/orion-lib/src/clusters/cached_watch.rs +++ b/orion-lib/src/clusters/cached_watch.rs @@ -66,7 +66,7 @@ pub struct CachedWatcher<'a, T: Clone> { local: T, } -impl<'a, T: Clone> CachedWatcher<'a, T> { +impl CachedWatcher<'_, T> { pub fn cached_or_latest(&mut self) -> &mut T { let parent_version = self.parent.version(); if parent_version != self.version { diff --git a/orion-lib/src/clusters/health/checkers/grpc/mod.rs b/orion-lib/src/clusters/health/checkers/grpc/mod.rs index 42833e96..4213a72b 100644 --- a/orion-lib/src/clusters/health/checkers/grpc/mod.rs +++ b/orion-lib/src/clusters/health/checkers/grpc/mod.rs @@ -31,6 +31,7 @@ use orion_xds::grpc_deps::tonic_health::pb::{ health_check_response::ServingStatus, HealthCheckRequest, HealthCheckResponse, }; use orion_xds::grpc_deps::{Response as TonicResponse, Status as TonicStatus}; +use std::future::Future; use tokio::sync::{mpsc, Notify}; use tokio::task::JoinHandle; @@ -118,10 +119,7 @@ where { type Response = HealthCheckResponse; - fn check( - &mut self, - ) -> impl futures::Future::Response, orion_error::Error>> - + std::marker::Send { + fn check(&mut self) -> impl Future> + Send { async move { let request = HealthCheckRequest { service: self.config.service_name.clone().into() }; Ok(self.channel.check(request).await.map(TonicResponse::into_inner)?) diff --git a/orion-lib/src/clusters/load_assignment.rs b/orion-lib/src/clusters/load_assignment.rs index 7f1ca20d..1029b3e8 100644 --- a/orion-lib/src/clusters/load_assignment.rs +++ b/orion-lib/src/clusters/load_assignment.rs @@ -266,12 +266,20 @@ impl LocalityLbEndpointsBuilder { .build() }) .collect::>()?; + + let total_endpoints_usize = endpoints.len(); + let healthy_endpoints_usize = endpoints.iter().filter(|e| e.health_status.is_healthy()).count(); + + let (Ok(total_endpoints), Ok(healthy_endpoints)) = + (u32::try_from(total_endpoints_usize), u32::try_from(healthy_endpoints_usize)) + else { + return Err("Too many endpoints".into()); + }; + // we divide by 100 because we multiply by 100 later to calculate a percentage - if endpoints.len() > (u32::MAX / 100) as usize { + if healthy_endpoints > u32::MAX / 100 { return Err("Too many endpoints".into()); } - let healthy_endpoints = endpoints.iter().filter(|e| e.health_status.is_healthy()).count() as u32; - let total_endpoints = endpoints.len() as u32; Ok(LocalityLbEndpoints { name: cluster_name, diff --git a/orion-lib/src/clusters/retry_policy.rs b/orion-lib/src/clusters/retry_policy.rs index ab1c7801..63b3f976 100644 --- a/orion-lib/src/clusters/retry_policy.rs +++ b/orion-lib/src/clusters/retry_policy.rs @@ -72,7 +72,7 @@ impl<'a, B: Body> FailureKind<'a, B> { if resp.status().is_informational() || resp.status().is_success() { return None; } - return Some(FailureKind::EligibleForRetry(resp)); + Some(FailureKind::EligibleForRetry(resp)) }, Err(err) => Self::try_infer_from_error(err.as_ref()), } diff --git a/orion-lib/src/configuration/mod.rs b/orion-lib/src/configuration/mod.rs index f8ad3926..05a5d6a0 100644 --- a/orion-lib/src/configuration/mod.rs +++ b/orion-lib/src/configuration/mod.rs @@ -31,7 +31,7 @@ pub fn get_listeners_and_clusters( let static_resources = bootstrap.static_resources; let secrets = static_resources.secrets; let mut secret_manager = SecretManager::new(); - secrets.into_iter().try_for_each(|secret| secret_manager.add(secret).map(|_| ()))?; + secrets.into_iter().try_for_each(|secret| secret_manager.add(&secret).map(|_| ()))?; let listeners = static_resources .listeners diff --git a/orion-lib/src/lib.rs b/orion-lib/src/lib.rs index 5897bd76..9f4563d9 100644 --- a/orion-lib/src/lib.rs +++ b/orion-lib/src/lib.rs @@ -56,6 +56,7 @@ pub type HttpBody = PolyBody; pub static RUNTIME_CONFIG: OnceLock = OnceLock::new(); +#[allow(clippy::expect_used, clippy::missing_panics_doc)] pub fn runtime_config() -> &'static Runtime { RUNTIME_CONFIG.get().expect("Called runtime_config without setting RUNTIME_CONFIG first") } diff --git a/orion-lib/src/listeners/filterchain.rs b/orion-lib/src/listeners/filterchain.rs index 3b1bb1c0..d9d67bf4 100644 --- a/orion-lib/src/listeners/filterchain.rs +++ b/orion-lib/src/listeners/filterchain.rs @@ -213,7 +213,7 @@ impl FilterchainType { stream, hyper::service::service_fn(|req: Request| { let handler_req = HttpHandlerRequest { request: req, source_addr: peer_addr }; - req_handler.call(handler_req).map_err(|e| e.inner()) + req_handler.call(handler_req).map_err(orion_error::Error::inner) }), ) .await diff --git a/orion-lib/src/listeners/http_connection_manager.rs b/orion-lib/src/listeners/http_connection_manager.rs index d1c788a6..2b99fe80 100644 --- a/orion-lib/src/listeners/http_connection_manager.rs +++ b/orion-lib/src/listeners/http_connection_manager.rs @@ -201,8 +201,8 @@ impl fmt::Display for HttpConnectionManager { } impl HttpConnectionManager { - pub fn get_route_id(&self) -> &Option { - &self.dynamic_route_name + pub fn get_route_id(&self) -> Option<&CompactString> { + self.dynamic_route_name.as_ref() } pub fn update_route(&self, route: Arc) { @@ -323,9 +323,6 @@ impl Service for HttpRequestHandler { // This function should check permissions and return forbidden response if access is denied fn apply_authorization_rules(rbac: &HttpRbac, req: &Request) -> Option> { debug!("Applying authorization rules {rbac:?} {:?}", &req.headers()); - if !rbac.is_permitted(req) { - Some(SyntheticHttpResponse::forbidden("RBAC: access denied").into_response(req.version())) - } else { - None - } + (!rbac.is_permitted(req)) + .then(|| SyntheticHttpResponse::forbidden("RBAC: access denied").into_response(req.version())) } diff --git a/orion-lib/src/listeners/http_connection_manager/redirect.rs b/orion-lib/src/listeners/http_connection_manager/redirect.rs index 4cd8b7ae..a9e86069 100644 --- a/orion-lib/src/listeners/http_connection_manager/redirect.rs +++ b/orion-lib/src/listeners/http_connection_manager/redirect.rs @@ -43,7 +43,7 @@ impl RequestHandler<(Request>, RouteMatchResult)> for &Red let UriParts { scheme: orig_scheme, authority: orig_authority, path_and_query: orig_path_and_query, .. } = parts.uri.into_parts(); let orig_host = orig_authority.as_ref().map(Authority::host); - let orig_port = orig_authority.as_ref().map(Authority::port_u16).flatten(); + let orig_port = orig_authority.as_ref().and_then(Authority::port_u16); let authority = match (self.authority_redirect.as_ref(), (orig_host, orig_port)) { //no redirect (None, _) => orig_authority, diff --git a/orion-lib/src/listeners/listener.rs b/orion-lib/src/listeners/listener.rs index 782bf86f..25e4406c 100644 --- a/orion-lib/src/listeners/listener.rs +++ b/orion-lib/src/listeners/listener.rs @@ -213,10 +213,6 @@ impl Listener { destination_addr: SocketAddr, server_name: Option<&str>, ) -> Result> { - //todo: smallvec? other optimization? - let mut possible_filters = vec![true; filter_chains.len()]; - let mut scratchpad = vec![MatchResult::NoRule; filter_chains.len()]; - fn match_subitem<'a, F: Fn(&FilterChainMatch, T) -> MatchResult, T: Copy>( function: F, comparand: T, @@ -242,6 +238,10 @@ impl Listener { } } + //todo: smallvec? other optimization? + let mut possible_filters = vec![true; filter_chains.len()]; + let mut scratchpad = vec![MatchResult::NoRule; filter_chains.len()]; + match_subitem( FilterChainMatch::matches_destination_port, destination_addr.port(), @@ -324,9 +324,8 @@ impl Listener { ); if let Some(stream) = filterchain.apply_rbac(stream, local_address, peer_addr, server_name.as_deref()) { return filterchain.start_filterchain(stream).await; - } else { - debug!("{listener_name} : dropped connection from {peer_addr} due to rbac"); } + debug!("{listener_name} : dropped connection from {peer_addr} due to rbac"); } else { warn!("{listener_name} : No match for {peer_addr} {local_address}"); } @@ -553,14 +552,14 @@ filter_chains: ", ) .unwrap(); - let m: HashMap = std::iter::once((m.try_into().unwrap(), ())).collect(); + let m = std::iter::once((m.try_into().unwrap(), ())).collect(); let good_source = (Ipv4Addr::LOCALHOST, 3300).into(); let good_destination = (Ipv4Addr::LOCALHOST, 443).into(); let good_host = Some("host.test"); - assert!(matches!(Listener::select_filterchain(&m, good_source, good_destination, good_host), Ok(Some(_)))); + assert!(matches!(Listener::select_filterchain(&m, good_source, good_destination, good_host), Ok(Some(())))); assert!(matches!( Listener::select_filterchain(&m, good_source, good_destination, Some("a.wildcard")), - Ok(Some(_)) + Ok(Some(())) )); assert!(matches!(Listener::select_filterchain(&m, good_source, good_destination, None), Ok(None))); assert!(matches!( diff --git a/orion-lib/src/listeners/rate_limiter/mod.rs b/orion-lib/src/listeners/rate_limiter/mod.rs index e53639e8..68fd1cb4 100644 --- a/orion-lib/src/listeners/rate_limiter/mod.rs +++ b/orion-lib/src/listeners/rate_limiter/mod.rs @@ -25,6 +25,7 @@ use std::sync::Arc; use http::status::StatusCode; use http::Request; use hyper::Response; +use tracing::warn; use token_bucket::TokenBucket; @@ -33,6 +34,7 @@ use orion_configuration::config::network_filters::http_connection_manager::http_ use crate::listeners::synthetic_http_response::SyntheticHttpResponse; use crate::{runtime_config, HttpBody}; +#[allow(dead_code)] #[derive(Debug, Clone)] // TODO: Implement rate limiting functionality - this struct defines the interface for local rate limits pub struct LocalRateLimit { @@ -40,6 +42,7 @@ pub struct LocalRateLimit { pub token_bucket: Arc, } +#[allow(dead_code)] impl LocalRateLimit { // TODO: Implement rate limit enforcement - used to check and consume tokens for incoming requests pub fn run(&self, req: &Request) -> Option> { @@ -57,11 +60,14 @@ impl From for LocalRateLimit { let max_tokens = rate_limit.max_tokens; let tokens_per_fill = rate_limit.tokens_per_fill; let fill_interval = rate_limit.fill_interval; - let token_bucket = Arc::new(TokenBucket::new( - max_tokens, - tokens_per_fill, - fill_interval.checked_mul(runtime_config().num_runtimes.into()).expect("too many runtimes (overflow)"), - )); + let adjusted_fill_interval = fill_interval.checked_mul(runtime_config().num_runtimes.into()); + let fill_interval = if let Some(value) = adjusted_fill_interval { + value + } else { + warn!("failed to adjust fill interval to number of configured runtimes (overflow)"); + fill_interval + }; + let token_bucket = Arc::new(TokenBucket::new(max_tokens, tokens_per_fill, fill_interval)); Self { status, token_bucket } } diff --git a/orion-lib/src/listeners/synthetic_http_response.rs b/orion-lib/src/listeners/synthetic_http_response.rs index 80d2ad89..872bafc7 100644 --- a/orion-lib/src/listeners/synthetic_http_response.rs +++ b/orion-lib/src/listeners/synthetic_http_response.rs @@ -29,7 +29,7 @@ use http_body_util::Full; // TODO: Error type for synthetic response validation - implement validation logic or remove if unused pub enum InvalidSyntheticResponse { #[error(transparent)] - InvalidHttpResponse(#[from] http::Error), + BadHttpResponse(#[from] http::Error), #[error(transparent)] InvalidHeaderValue(#[from] http::header::InvalidHeaderValue), #[error(transparent)] @@ -56,7 +56,7 @@ impl SyntheticHttpResponse { Self { http_status: StatusCode::BAD_GATEWAY, body: Bytes::default(), close_connection: true } } - // TODO: Implement forbidden response functionality - used for access control features + #[allow(dead_code)] pub fn forbidden(msg: &str) -> Self { Self { http_status: StatusCode::FORBIDDEN, diff --git a/orion-lib/src/secrets/secrets_manager.rs b/orion-lib/src/secrets/secrets_manager.rs index 6d7e8d99..4cee6449 100644 --- a/orion-lib/src/secrets/secrets_manager.rs +++ b/orion-lib/src/secrets/secrets_manager.rs @@ -138,7 +138,7 @@ impl SecretManager { Self { certificate_secrets: HashMap::default(), validation_contexts: HashMap::default() } } - pub fn add(&mut self, secret: Secret) -> Result { + pub fn add(&mut self, secret: &Secret) -> Result { let secret_id = secret.name(); let secret = match secret.kind() { Type::TlsCertificate(certificate) => { diff --git a/orion-lib/src/transport/bind_device.rs b/orion-lib/src/transport/bind_device.rs index 1f8412d7..7ccffb03 100644 --- a/orion-lib/src/transport/bind_device.rs +++ b/orion-lib/src/transport/bind_device.rs @@ -34,6 +34,7 @@ pub(crate) fn bind_device(_: &tokio::net::TcpSocket, _: &BindDevice) -> std::io: #[cfg(test)] mod tests { + #![allow(clippy::manual_c_str_literals)] use std::ffi::CStr; use super::*; @@ -160,7 +161,7 @@ mod tests { fn direct_decode_bytes() { let yaml = "interface_bytes: YRs="; let iface = CStr::from_bytes_with_nul(b"a\x1b\0").unwrap(); - let bd: BindDevice = serde_yaml::from_str(&yaml).unwrap(); + let bd: BindDevice = serde_yaml::from_str(yaml).unwrap(); assert_eq!(iface, bd.interface()) } @@ -168,7 +169,7 @@ mod tests { fn direct_decode_iface() { let yaml = "interface: eth0"; let iface = CStr::from_bytes_with_nul(b"eth0\0").unwrap(); - let bd: BindDevice = serde_yaml::from_str(&yaml).unwrap(); + let bd: BindDevice = serde_yaml::from_str(yaml).unwrap(); assert_eq!(iface, bd.interface()) } } diff --git a/orion-lib/src/transport/http_channel.rs b/orion-lib/src/transport/http_channel.rs index 0b011eb3..53a0506a 100644 --- a/orion-lib/src/transport/http_channel.rs +++ b/orion-lib/src/transport/http_channel.rs @@ -122,7 +122,7 @@ impl HttpChannelBuilder { tls: None, authority: None, bind_device, - http_protocol_options: Default::default(), + http_protocol_options: HttpProtocolOptions::default(), server_name: None, connection_timeout: None, } @@ -241,12 +241,12 @@ impl<'a> RequestHandler> for &HttpChannel { let req = maybe_normalize_uri(request.req, false)?; let result = if let Some(t) = route_timeout { - match fast_timeout(t, HttpChannel::send_request(retry_policy.as_deref(), sender, req)).await { + match fast_timeout(t, HttpChannel::send_request(retry_policy, sender, req)).await { Ok(result) => result, Err(_) => (Err(EventError::RouteTimeout.into()), t), } } else { - HttpChannel::send_request(retry_policy.as_deref(), sender, req).await + HttpChannel::send_request(retry_policy, sender, req).await }; HttpChannel::handle_response(result, route_timeout, version) }, @@ -261,12 +261,12 @@ impl<'a> RequestHandler> for &HttpChannel { let req = maybe_normalize_uri(request.req, true)?; let req = maybe_change_http_protocol_version(req, configured_version)?; let result = if let Some(t) = route_timeout { - match fast_timeout(t, HttpChannel::send_request(retry_policy.as_deref(), sender, req)).await { + match fast_timeout(t, HttpChannel::send_request(retry_policy, sender, req)).await { Ok(result) => result, Err(_) => (Err(EventError::RouteTimeout.into()), t), } } else { - HttpChannel::send_request(retry_policy.as_deref(), sender, req).await + HttpChannel::send_request(retry_policy, sender, req).await }; HttpChannel::handle_response(result, route_timeout, version) @@ -287,7 +287,7 @@ impl HttpChannel { where C: Connect + Clone + Send + Sync + 'static, { - if let Some(ref policy) = retry_policy { + if let Some(policy) = retry_policy { if policy.is_retriable(&req) { Self::send_with_retry(policy, sender, req).await } else { diff --git a/orion-lib/src/transport/tls_inspector.rs b/orion-lib/src/transport/tls_inspector.rs index d3f3a5a4..57679feb 100644 --- a/orion-lib/src/transport/tls_inspector.rs +++ b/orion-lib/src/transport/tls_inspector.rs @@ -44,7 +44,7 @@ impl<'a> TlsInspector<'a> { } } -impl<'a> AsyncRead for TlsInspector<'a> { +impl AsyncRead for TlsInspector<'_> { fn poll_read( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, @@ -92,7 +92,7 @@ impl<'a> AsyncRead for TlsInspector<'a> { // // for now, we simply error out on any writes. The TLS protocol should not require that we write anything before receiving the initial handshake // see https://tls13.xargs.org/ -impl<'a> AsyncWrite for TlsInspector<'a> { +impl AsyncWrite for TlsInspector<'_> { fn poll_flush( self: Pin<&mut Self>, _: &mut std::task::Context<'_>, diff --git a/orion-proxy/src/core_affinity.rs b/orion-proxy/src/core_affinity.rs index 79e012f5..b09a4039 100644 --- a/orion-proxy/src/core_affinity.rs +++ b/orion-proxy/src/core_affinity.rs @@ -193,6 +193,7 @@ fn run_strategy( #[cfg(test)] mod tests { + #![allow(clippy::too_many_lines)] use super::*; #[test] diff --git a/orion-proxy/src/lib.rs b/orion-proxy/src/lib.rs index f9051244..e048ed3e 100644 --- a/orion-proxy/src/lib.rs +++ b/orion-proxy/src/lib.rs @@ -91,7 +91,7 @@ mod proxy_tracing { // Update tracing layer if necessary (stdout -> file) if let Some(log_file) = log_conf.log_file { self.layer_reload_handle.modify(|layer| { - let (new_guard, new_layer) = Self::file_layer(&log_file, log_conf.log_directory); + let (new_guard, new_layer) = Self::file_layer(&log_file, log_conf.log_directory.as_ref()); *layer = new_layer; self.guard = new_guard; })?; @@ -139,9 +139,8 @@ mod proxy_tracing { } #[cfg(not(feature = "console"))] - fn file_layer(filename: &str, log_directory: Option) -> (WorkerGuard, RegistryLayer) { - let file_appender = - tracing_appender::rolling::hourly(log_directory.as_ref().unwrap_or(&".".into()), filename); + fn file_layer(filename: &str, log_directory: Option<&String>) -> (WorkerGuard, RegistryLayer) { + let file_appender = tracing_appender::rolling::hourly(log_directory.unwrap_or(&".".into()), filename); let (non_blocking, guard) = tracing_appender::non_blocking(file_appender); let file_layer = fmt::layer().with_ansi(false).with_writer(non_blocking).with_thread_names(true); diff --git a/orion-proxy/src/proxy.rs b/orion-proxy/src/proxy.rs index 460379d6..28be9476 100644 --- a/orion-proxy/src/proxy.rs +++ b/orion-proxy/src/proxy.rs @@ -148,7 +148,7 @@ fn spawn_runtime_from_thread(num_threads: usize, runtime_id: RuntimeId) -> Resul Ok((handle, configuration_senders)) } -//TODO: this is crap +#[allow(clippy::too_many_arguments)] fn xds_loop( node: Node, configuration_senders: Vec, @@ -168,7 +168,7 @@ fn xds_loop( format!("xdstask_{id}") }) .build() - .expect("failed to build basic runtime"); + .with_context(|| "failed to build basic runtime")?; runtime.block_on(async move { let secret_manager = configure_initial_resources(secret_manager, listeners, configuration_senders.clone()).await?; diff --git a/orion-proxy/src/xds_configurator.rs b/orion-proxy/src/xds_configurator.rs index 1b639197..6f0c0750 100644 --- a/orion-proxy/src/xds_configurator.rs +++ b/orion-proxy/src/xds_configurator.rs @@ -260,7 +260,7 @@ impl XdsConfigurationHandler { }, XdsResourcePayload::Secret(id, secret) => { debug!("Got update for secret {id}: {:#?}", secret); - let res = self.secret_manager.add(secret); + let res = self.secret_manager.add(&secret); match res { Ok(secret) => { diff --git a/orion-proxy/tests/configs.rs b/orion-proxy/tests/configs.rs index 49869309..28c6460e 100644 --- a/orion-proxy/tests/configs.rs +++ b/orion-proxy/tests/configs.rs @@ -1,3 +1,4 @@ +#![allow(clippy::expect_used)] use orion_configuration::config::{Config, Runtime}; use orion_configuration::options::Options; use orion_lib::configuration::get_listeners_and_clusters; diff --git a/orion-xds/Cargo.toml b/orion-xds/Cargo.toml index 4ba3454a..6c376f54 100644 --- a/orion-xds/Cargo.toml +++ b/orion-xds/Cargo.toml @@ -25,23 +25,6 @@ uuid = { version = "1.7.0", features = ["v4"] } [dev-dependencies] serde_yaml.workspace = true -tokio.workspace = true -tracing-subscriber = { version = "0.3", features = ["env-filter"] } -[lints.clippy] -clone_on_ref_ptr = "warn" -doc_markdown = "allow" -get_unwrap = "allow" -if_then_some_else_none = "warn" -missing_docs_in_private_items = "allow" -missing_errors_doc = "allow" -missing_panics_doc = "allow" -module_name_repetitions = "allow" -must_use_candidate = "allow" -pedantic = "warn" -print_stderr = "warn" -print_stdout = "warn" -str_to_string = "warn" -string_to_string = "warn" -todo = "deny" -unwrap_used = "deny" +[lints] +workspace = true diff --git a/orion-xds/examples/server_routes_and_loads.rs b/orion-xds/examples/server_routes_and_loads.rs index 2e326553..8497351f 100644 --- a/orion-xds/examples/server_routes_and_loads.rs +++ b/orion-xds/examples/server_routes_and_loads.rs @@ -1,3 +1,4 @@ +#![allow(clippy::expect_used)] use std::{future::IntoFuture, time::Duration}; use orion_xds::xds::{ diff --git a/orion-xds/src/xds/bindings.rs b/orion-xds/src/xds/bindings.rs index 22f502ee..da6ead3e 100644 --- a/orion-xds/src/xds/bindings.rs +++ b/orion-xds/src/xds/bindings.rs @@ -18,6 +18,7 @@ // // +#![allow(dead_code)] use std::{future::Future, pin::Pin}; use model::TypeUrl; diff --git a/orion-xds/src/xds/resources/mod.rs b/orion-xds/src/xds/resources/mod.rs index 7bac2a09..1318b314 100644 --- a/orion-xds/src/xds/resources/mod.rs +++ b/orion-xds/src/xds/resources/mod.rs @@ -347,7 +347,7 @@ pub fn create_listener( type_url: "type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager" .to_owned(), - value: http_connection_manager.encode_to_vec().into(), + value: http_connection_manager.encode_to_vec(), }; let http_connection_manager_filter = Filter { From 814a88d3849db4db16d4485c31294bad8f5a0455 Mon Sep 17 00:00:00 2001 From: Alan Keane Date: Fri, 28 Feb 2025 12:30:00 +0000 Subject: [PATCH 03/64] Fix logical errors for slow arriving ClientHello - fix broken comparison where n_bytes >= *bytes_read would error on new data - fix incorrect read buffer alignment for init and outbound copy - change read buffer size to be adaptive Signed-off-by: Alan Keane --- orion-lib/src/transport/tls_inspector.rs | 33 ++++++++++++++---------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/orion-lib/src/transport/tls_inspector.rs b/orion-lib/src/transport/tls_inspector.rs index 57679feb..0300e045 100644 --- a/orion-lib/src/transport/tls_inspector.rs +++ b/orion-lib/src/transport/tls_inspector.rs @@ -19,7 +19,7 @@ // use rustls::server::Acceptor; -use std::{pin::Pin, task::Poll}; +use std::{cmp::Ordering, pin::Pin, task::Poll}; use tokio::{ io::{AsyncRead, AsyncWrite}, net::TcpStream, @@ -61,24 +61,29 @@ impl AsyncRead for TlsInspector<'_> { poll.map(|p| p.map(|_| ())) } } else { - //if we have little space left in the buffer, grow it. - // maximum size should be capped by rustls failing the handshake. - if buffer.len().checked_sub(*bytes_read).unwrap_or_default() <= 512 { - buffer.resize(buffer.len() + 4 * (1 << 10), 0); + let write_capacity = buf.remaining(); + let internal_buffer_bound = *bytes_read + write_capacity; + if buffer.len() < internal_buffer_bound { + buffer.resize(internal_buffer_bound, 0); } - let mut peek_read_buf = tokio::io::ReadBuf::new(&mut buffer[..buf.remaining()]); + let mut peek_read_buf = tokio::io::ReadBuf::new(buffer); let poll = Pin::new(stream).poll_peek(cx, &mut peek_read_buf); if let Poll::Ready(Ok(n_bytes)) = poll { - //this should never fail, as that would imply we peeked less bytes than we did previously - if n_bytes >= *bytes_read { - return Poll::Ready(Err(std::io::Error::other( + match n_bytes.cmp(bytes_read) { + Ordering::Greater => { + let newly_read = &peek_read_buf.filled()[*bytes_read..n_bytes]; + buf.put_slice(newly_read); + *bytes_read = n_bytes; + Poll::Ready(Ok(())) + }, + Ordering::Equal => { + // TODO this case potentially problematic + Poll::Ready(Ok(())) + }, + Ordering::Less => Poll::Ready(Err(std::io::Error::other( "TLS inspector peeked less bytes than it did in a previous iteration", - ))); + ))), } - let newly_read = &peek_read_buf.filled()[*bytes_read..]; - buf.put_slice(newly_read); - *bytes_read = n_bytes; - Poll::Ready(Ok(())) } else { poll.map(|p| p.map(|_| ())) } From 7f6b556e3e86e9461f6f7378240dc1a7c1fc41e7 Mon Sep 17 00:00:00 2001 From: Alan Keane Date: Wed, 5 Mar 2025 10:06:35 +0000 Subject: [PATCH 04/64] Keep task alive when no data available Signed-off-by: Alan Keane --- orion-lib/src/transport/tls_inspector.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/orion-lib/src/transport/tls_inspector.rs b/orion-lib/src/transport/tls_inspector.rs index 0300e045..07c25a16 100644 --- a/orion-lib/src/transport/tls_inspector.rs +++ b/orion-lib/src/transport/tls_inspector.rs @@ -77,8 +77,8 @@ impl AsyncRead for TlsInspector<'_> { Poll::Ready(Ok(())) }, Ordering::Equal => { - // TODO this case potentially problematic - Poll::Ready(Ok(())) + cx.waker().wake_by_ref(); + Poll::Pending }, Ordering::Less => Poll::Ready(Err(std::io::Error::other( "TLS inspector peeked less bytes than it did in a previous iteration", From f644cd0a3fe3b85055a1361373e7a50a613edf38 Mon Sep 17 00:00:00 2001 From: Hayley Deckers Date: Tue, 12 Nov 2024 14:44:27 +0000 Subject: [PATCH 05/64] rename traits, allow for context on options Signed-off-by: Hayley Deckers --- orion-configuration/examples/convert.rs | 2 +- orion-configuration/src/config.rs | 2 +- orion-configuration/src/config/log.rs | 3 +- orion-configuration/src/main.rs | 2 +- orion-error/src/lib.rs | 46 +++++++++++-------- .../http_connection_manager/redirect.rs | 2 +- .../http_connection_manager/route.rs | 2 +- orion-proxy/src/proxy.rs | 2 +- 8 files changed, 35 insertions(+), 26 deletions(-) diff --git a/orion-configuration/examples/convert.rs b/orion-configuration/examples/convert.rs index 6dbad453..9c819061 100644 --- a/orion-configuration/examples/convert.rs +++ b/orion-configuration/examples/convert.rs @@ -1,6 +1,6 @@ #![allow(clippy::print_stdout)] use orion_configuration::config::Bootstrap; -use orion_error::{Result, ResultExtension}; +use orion_error::{Context, Result}; use std::fs::File; fn main() -> Result<()> { diff --git a/orion-configuration/src/config.rs b/orion-configuration/src/config.rs index efaa15e9..582b9395 100644 --- a/orion-configuration/src/config.rs +++ b/orion-configuration/src/config.rs @@ -96,7 +96,7 @@ mod envoy_conversions { }; use orion_data_plane_api::decode::from_serde_deserializer; pub use orion_data_plane_api::envoy_data_plane_api::envoy::config::bootstrap::v3::Bootstrap as EnvoyBootstrap; - use orion_error::ResultExtension; + use orion_error::Context; use serde::Deserialize; #[derive(Deserialize)] diff --git a/orion-configuration/src/config/log.rs b/orion-configuration/src/config/log.rs index 5cfa9a43..301bbfb0 100644 --- a/orion-configuration/src/config/log.rs +++ b/orion-configuration/src/config/log.rs @@ -18,6 +18,7 @@ // // +use orion_error::Error; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use tracing_subscriber::EnvFilter; @@ -48,7 +49,7 @@ where Option::::deserialize(deserializer).and_then(|maybe_string| { maybe_string.map(|s| EnvFilter::builder().parse(s)).transpose().map_err( |e: tracing_subscriber::filter::ParseError| { - serde::de::Error::custom(format!("failed to deserialize log level because of \"{e}\"")) + serde::de::Error::custom(Error::from(e).context("failed to parse log level config")) }, ) }) diff --git a/orion-configuration/src/main.rs b/orion-configuration/src/main.rs index 440ab19a..6984fa9b 100644 --- a/orion-configuration/src/main.rs +++ b/orion-configuration/src/main.rs @@ -20,7 +20,7 @@ // use orion_configuration::{config::Config, options::Options, Result}; -use orion_error::ResultExtension; +use orion_error::Context; fn main() -> Result<()> { let config = Config::new(&Options::from_path("bootstrap.yaml"))?; diff --git a/orion-error/src/lib.rs b/orion-error/src/lib.rs index 48adce77..fd34f447 100644 --- a/orion-error/src/lib.rs +++ b/orion-error/src/lib.rs @@ -54,7 +54,7 @@ impl Display for Error { impl Error { #[must_use] pub fn context>>(self, msg: T) -> Self { - Self(ErrorImpl::Msg(msg.into(), self.0.into())) + Self(ErrorImpl::Msg(msg.into(), self.0.into_boxed_err())) } pub fn inner(self) -> impl ErrorTrait + Send + Sync + 'static { @@ -95,6 +95,16 @@ enum ErrorImpl { Msg(Cow<'static, str>, BoxedErr), } +impl ErrorImpl { + fn into_boxed_err(self) -> BoxedErr { + if let Self::Error(boxed) = self { + boxed + } else { + self.into() + } + } +} + impl ErrorTrait for ErrorImpl { fn source(&self) -> Option<&(dyn ErrorTrait + 'static)> { match self { @@ -140,37 +150,25 @@ impl Display for ErrorImpl { } } -// alows for doing error.context("some description") on any error -pub trait ErrorExtension { - #[must_use] - fn context>>(self, context: T) -> Error; -} - -impl ErrorExtension for E { - fn context>>(self, msg: T) -> Error { - Error(ErrorImpl::Msg(msg.into(), self.into())) - } -} - -// alows for doing error.context("some description") on results -pub trait ResultExtension { +// alows for doing error.context("some description") +pub trait Context { type T; fn context>>(self, context: Msg) -> Result; fn with_context Msg, Msg: Into>>(self, context_fn: F) -> Result; } -impl ResultExtension for StdResult { +impl Context for StdResult { type T = T; fn context>>(self, context: Msg) -> Result { - self.map_err(|e| e.context(context)) + self.map_err(|e| Error::from(e).context(context)) } fn with_context Msg, Msg: Into>>(self, context_fn: F) -> Result { - self.map_err(|e| e.context(context_fn())) + self.map_err(|e| Error::from(e).context(context_fn())) } } // Error does not implement the ErrorTrait, so the previous impl does not apply to it -impl ResultExtension for Result { +impl Context for Result { type T = T; fn context>>(self, context: Msg) -> Result { self.map_err(|e| e.context(context)) @@ -179,3 +177,13 @@ impl ResultExtension for Result { self.map_err(|e| e.context(context_fn())) } } + +impl Context for Option { + type T = T; + fn context>>(self, msg: C) -> Result { + self.ok_or_else(|| Error::from(msg.into())) + } + fn with_context C, C: Into>>(self, context_fn: F) -> Result { + self.ok_or_else(|| Error::from(context_fn().into())) + } +} diff --git a/orion-lib/src/listeners/http_connection_manager/redirect.rs b/orion-lib/src/listeners/http_connection_manager/redirect.rs index a9e86069..bcedc269 100644 --- a/orion-lib/src/listeners/http_connection_manager/redirect.rs +++ b/orion-lib/src/listeners/http_connection_manager/redirect.rs @@ -29,7 +29,7 @@ use hyper::{body::Incoming, Request, Response}; use orion_configuration::config::network_filters::http_connection_manager::route::{ AuthorityRedirect, RedirectAction, RouteMatchResult, }; -use orion_error::ResultExtension; +use orion_error::Context; use std::str::FromStr; impl RequestHandler<(Request>, RouteMatchResult)> for &RedirectAction { diff --git a/orion-lib/src/listeners/http_connection_manager/route.rs b/orion-lib/src/listeners/http_connection_manager/route.rs index 6888440d..af53d650 100644 --- a/orion-lib/src/listeners/http_connection_manager/route.rs +++ b/orion-lib/src/listeners/http_connection_manager/route.rs @@ -32,7 +32,7 @@ use orion_configuration::config::network_filters::http_connection_manager::{ route::{RouteAction, RouteMatchResult}, VirtualHost, }; -use orion_error::ResultExtension; +use orion_error::Context; use std::net::SocketAddr; use tracing::debug; diff --git a/orion-proxy/src/proxy.rs b/orion-proxy/src/proxy.rs index 28be9476..266c20bd 100644 --- a/orion-proxy/src/proxy.rs +++ b/orion-proxy/src/proxy.rs @@ -25,7 +25,7 @@ use crate::{ }; use futures::future::join_all; use orion_configuration::config::{bootstrap::Node, Bootstrap}; -use orion_error::ResultExtension; +use orion_error::Context; use orion_lib::{ get_listeners_and_clusters, new_configuration_channel, runtime_config, ConfigurationReceivers, ConfigurationSenders, ListenerConfigurationChange, Result, SecretManager, From 6af16c9569b4056745493f870a2090c2e7c752b1 Mon Sep 17 00:00:00 2001 From: Alan Keane Date: Mon, 3 Mar 2025 10:36:26 +0000 Subject: [PATCH 06/64] Fix NACK not sent for malformed xDS messages with decoding errors Signed-off-by: Alan Keane --- orion-xds/src/xds/client.rs | 163 ++++++++++++++++++++---------------- 1 file changed, 93 insertions(+), 70 deletions(-) diff --git a/orion-xds/src/xds/client.rs b/orion-xds/src/xds/client.rs index c02dcc7d..ac65d56a 100644 --- a/orion-xds/src/xds/client.rs +++ b/orion-xds/src/xds/client.rs @@ -343,79 +343,94 @@ impl DeltaClientBackgroundWorker { let nonce = response.nonce.clone(); info!(type_url = type_url.to_string(), size = response.resources.len(), "received config resources from xDS"); - let (updates, mut pending_update_versions) = Self::map_updates(state, response, type_url); - let (internal_ack_tx, internal_ack_rx) = oneshot::channel::>(); - let notification = XdsUpdateEvent { updates, ack_channel: internal_ack_tx }; - self.resources_tx - .send(notification) - .await - .map_err(|e: mpsc::error::SendError| XdsError::InternalProcessingError(e.to_string()))?; - - tokio::select! { - ack = internal_ack_rx => { - match ack { - Ok(rejected_configs) => { - let error = if rejected_configs.is_empty() { - debug!( - type_url = type_url.to_string(), - nonce, - "sending ack response after processing", - ); - let tracked_resources = state.tracked.entry(type_url).or_default(); - for (resource_id, resource_version) in pending_update_versions.drain() { - tracked_resources.insert(resource_id, resource_version); - } - None - } else { - let error = rejected_configs - .into_iter() - .map(|reject| reject.to_string()) - .collect::>() - .join("; "); - debug!( - type_url = type_url.to_string(), - error, - nonce, - "rejecting configs with nack response", - ); - Some(Status { - message: error, - code: tonic::Code::InvalidArgument.into(), + let (updates, mut pending_update_versions, decoding_errors) = Self::map_updates(state, response, type_url); + + if decoding_errors.is_empty() { + let (internal_ack_tx, internal_ack_rx) = oneshot::channel::>(); + let notification = XdsUpdateEvent { updates, ack_channel: internal_ack_tx }; + self.resources_tx.send(notification).await.map_err(|e: mpsc::error::SendError| { + XdsError::InternalProcessingError(e.to_string()) + })?; + tokio::select! { + ack = internal_ack_rx => { + match ack { + Ok(rejected_configs) => { + let error = if rejected_configs.is_empty() { + debug!( + type_url = type_url.to_string(), + nonce, + "sending ack response after processing", + ); + let tracked_resources = state.tracked.entry(type_url).or_default(); + for (resource_id, resource_version) in pending_update_versions.drain() { + tracked_resources.insert(resource_id, resource_version); + } + None + } else { + let error = rejected_configs + .into_iter() + .map(|reject| reject.to_string()) + .collect::>() + .join("; "); + debug!( + type_url = type_url.to_string(), + error, + nonce, + "rejecting configs with nack response", + ); + Some(Status { + message: error, + code: tonic::Code::InvalidArgument.into(), + ..Default::default() + }) + }; + if let Err(err) = acknowledgments_tx.send(DeltaDiscoveryRequest { + type_url: type_url.to_string(), + response_nonce: nonce, + error_detail: error, ..Default::default() - }) - }; - if let Err(err) = acknowledgments_tx.send(DeltaDiscoveryRequest { - type_url: type_url.to_string(), - response_nonce: nonce, - error_detail: error, + }).await { + warn!("error in send xDS ack/nack upstream {:?}", err); + } + }, + Err(err) => { + warn!("error in reading internal ack/nack {:?}", err); + }, + } + } + () = time::sleep(ACK_TIMEOUT) => { + warn!("timed out while waiting to acknowledge config updates"); + let error = pending_update_versions.into_keys() + .collect::>() + .join("; "); + let _ = acknowledgments_tx.send(DeltaDiscoveryRequest { + type_url: type_url.to_string(), + response_nonce: nonce, + error_detail: Some(Status { + message: error, ..Default::default() - }) - .await - { - warn!("error in send xDS ack/nack upstream {:?}", err); - } - }, - Err(err) => { - warn!("error in reading internal ack/nack {:?}", err); - }, + }), + ..Default::default() + }).await; } } - () = time::sleep(ACK_TIMEOUT) => { - warn!("timed out while waiting to acknowledge config updates"); - let error = pending_update_versions.into_keys() - .collect::>() - .join("; "); - let error = Status { - message: error, - ..Default::default() - }; - let _ = acknowledgments_tx.send(DeltaDiscoveryRequest { + } else { + let error = + decoding_errors.into_iter().map(|reject| reject.to_string()).collect::>().join("; "); + if let Err(err) = acknowledgments_tx + .send(DeltaDiscoveryRequest { type_url: type_url.to_string(), - response_nonce: nonce, - error_detail: Some(error), + response_nonce: nonce, + error_detail: Some(Status { + message: error, + code: tonic::Code::InvalidArgument.into(), ..Default::default() - }) - .await; + }), + ..Default::default() + }) + .await + { + warn!("error in send xDS ack/nack upstream {:?}", err); } } @@ -426,7 +441,7 @@ impl DeltaClientBackgroundWorker { state: &mut DiscoveryClientState, response: DeltaDiscoveryResponse, type_url: TypeUrl, - ) -> (Vec, HashMap) { + ) -> (Vec, HashMap, Vec) { let for_removal: Vec = response .removed_resources .iter() @@ -440,6 +455,7 @@ impl DeltaClientBackgroundWorker { .collect(); let mut pending_update_versions = HashMap::::new(); + let mut decoding_errors = Vec::::new(); let updates = response .resources @@ -449,7 +465,14 @@ impl DeltaClientBackgroundWorker { let resource_version = resource.version.clone(); let decoded = XdsResourcePayload::try_from((resource, type_url)); if decoded.is_err() { - warn!("problem decoding config update for {} : error {:?}", resource_id, decoded.as_ref().err()); + let error_msg = format!( + "problem decoding config update for {} : error {:?}", + resource_id, + decoded.as_ref().err() + ); + let decoding_error: orion_error::Error = error_msg.clone().into(); + decoding_errors.push(RejectedConfig::from((resource_id.clone(), decoding_error))); + warn!(error_msg); } else { pending_update_versions.insert(resource_id.clone(), resource_version); debug!("decoded config update for resource {resource_id}"); @@ -459,6 +482,6 @@ impl DeltaClientBackgroundWorker { .chain(for_removal.into_iter().map(|resource_id| XdsResourceUpdate::Remove(resource_id, type_url))) .collect(); - (updates, pending_update_versions) + (updates, pending_update_versions, decoding_errors) } } From aada8452fa37d57a9af37a94e3b86ede5402fa73 Mon Sep 17 00:00:00 2001 From: Alan Keane Date: Tue, 4 Mar 2025 12:04:32 +0000 Subject: [PATCH 07/64] Refactor xDS client.rs to improve readability Signed-off-by: Alan Keane --- orion-proxy/src/xds_configurator.rs | 2 +- orion-xds/examples/client.rs | 2 +- orion-xds/src/xds/client.rs | 327 ++++++++++++++-------------- orion-xds/src/xds/mod.rs | 1 + orion-xds/src/xds/model.rs | 5 +- orion-xds/src/xds/request.rs | 105 +++++++++ 6 files changed, 275 insertions(+), 167 deletions(-) create mode 100644 orion-xds/src/xds/request.rs diff --git a/orion-proxy/src/xds_configurator.rs b/orion-proxy/src/xds_configurator.rs index 6f0c0750..9ba0f579 100644 --- a/orion-proxy/src/xds_configurator.rs +++ b/orion-proxy/src/xds_configurator.rs @@ -160,7 +160,7 @@ impl XdsConfigurationHandler { let mut rejected_updates = Vec::new(); for update in updates { match update { - XdsResourceUpdate::Update(id, resource) => { + XdsResourceUpdate::Update(id, resource, _) => { if let Err(e) = self.process_update_event(&id, resource).await { rejected_updates.push(RejectedConfig::from((id, e))); } diff --git a/orion-xds/examples/client.rs b/orion-xds/examples/client.rs index 858f20b6..f149915e 100644 --- a/orion-xds/examples/client.rs +++ b/orion-xds/examples/client.rs @@ -33,7 +33,7 @@ async fn main() -> Result<(), Box> { for update in notification.updates { match update { - XdsResourceUpdate::Update(_id, resource) => match resource { + XdsResourceUpdate::Update(_id, resource, _) => match resource { XdsResourcePayload::Listener(_id, resource) => { info!("Got update for listener {resource:#?}"); }, diff --git a/orion-xds/src/xds/client.rs b/orion-xds/src/xds/client.rs index ac65d56a..94e44d45 100644 --- a/orion-xds/src/xds/client.rs +++ b/orion-xds/src/xds/client.rs @@ -21,14 +21,12 @@ use super::{ bindings, model::{RejectedConfig, ResourceId, ResourceVersion, TypeUrl, XdsError, XdsResourcePayload, XdsResourceUpdate}, + request::{DeltaDiscoveryRequestBuilder, StatusBuilder}, }; +use core::result::Result::{Err, Ok}; use orion_configuration::config::bootstrap::Node; use orion_data_plane_api::envoy_data_plane_api::{ - envoy::{ - config::core::v3::Node as EnvoyNode, - service::discovery::v3::{DeltaDiscoveryRequest, DeltaDiscoveryResponse}, - }, - google::rpc::Status, + envoy::service::discovery::v3::{DeltaDiscoveryRequest, DeltaDiscoveryResponse}, tonic, }; use std::{ @@ -160,7 +158,6 @@ pub struct DeltaClientBackgroundWorker { impl DeltaClientBackgroundWorker { pub async fn run(&mut self) -> Result<(), XdsError> { let mut connection_id = 0; - let mut state = DiscoveryClientState { backoff: INITIAL_BACKOFF, tracked: HashMap::new(), @@ -190,7 +187,7 @@ impl DiscoveryClientState { impl DeltaClientBackgroundWorker { async fn persistently_connect(&mut self, state: &mut DiscoveryClientState) { - match self.stream_resources(state).await { + match self.continuously_discover_resources(state).await { Err(ref e @ XdsError::GrpcStatus(ref status)) => { let backoff = std::cmp::min(MAX_BACKOFF, state.backoff * 2); let err_detail = e.to_string(); @@ -221,54 +218,27 @@ impl DeltaClientBackgroundWorker { } } - async fn stream_resources(&mut self, state: &mut DiscoveryClientState) -> Result<(), XdsError> { + async fn continuously_discover_resources(&mut self, state: &mut DiscoveryClientState) -> Result<(), XdsError> { let (discovery_requests_tx, mut discovery_requests_rx) = mpsc::channel::(100); - let resource_types = match C::type_url() { - Some(type_url) => vec![type_url], - _ => vec![ - TypeUrl::Secret, - TypeUrl::ClusterLoadAssignment, - TypeUrl::Cluster, - TypeUrl::RouteConfiguration, - TypeUrl::Listener, - ], - }; - let initial_requests: Vec = resource_types - .iter() - .map(|resource_type| { - let subscriptions = state.subscriptions.get(resource_type).cloned().unwrap_or_default(); - let already_tracked: HashMap = - state.tracked.get(resource_type).cloned().unwrap_or_default(); - DeltaDiscoveryRequest { - node: Some(EnvoyNode { id: self.node.id.clone().into_string(), ..Default::default() }), - type_url: resource_type.to_string(), - initial_resource_versions: already_tracked, - resource_names_subscribe: subscriptions.into_iter().collect(), - ..Default::default() - } - }) - .collect(); - - let outbound_requests = async_stream::stream! { + let initial_requests = self.build_initial_discovery_requests(state); + let outbound_request_stream = async_stream::stream! { for request in initial_requests { - info!( - "sending initial discovery request {request:?}", - ); - + info!("sending initial discovery request {request:?}"); yield request; } while let Some(message) = discovery_requests_rx.recv().await { - info!( - "sending subsequent discovery request {message:?}", - ); + info!("sending upstream xDS message {message:?}"); yield message } warn!("outbound discovery request stream has ended!"); }; - - let mut response_stream = - self.client_binding.delta_request(outbound_requests).await.map_err(XdsError::GrpcStatus)?.into_inner(); + let mut inbound_response_stream = self + .client_binding + .delta_request(outbound_request_stream) + .await + .map_err(XdsError::GrpcStatus)? + .into_inner(); info!("xDS stream established"); state.reset_backoff(); @@ -277,13 +247,13 @@ impl DeltaClientBackgroundWorker { Some(event) = self.subscriptions_rx.recv() => { self.process_subscription_event(event, state, &discovery_requests_tx).await; } - discovered = response_stream.message() => { + discovered = inbound_response_stream.message() => { let payload = discovered?; let discovery_response = payload.ok_or(XdsError::UnknownResourceType("empty payload received".to_owned()))?; - self.process_and_acknowledge(discovery_response, &discovery_requests_tx, state).await?; + self.process_discovery_response(discovery_response, &discovery_requests_tx, state).await?; }, else => { - warn!("All channels are closed...exiting"); + warn!("xDS channels are closed...exiting"); return Ok(()) } } @@ -302,12 +272,12 @@ impl DeltaClientBackgroundWorker { let is_new = state.subscriptions.entry(type_url).or_default().insert(resource_id.clone()); if is_new { if let Err(err) = discovery_requests_tx - .send(DeltaDiscoveryRequest { - node: Some(EnvoyNode { id: self.node.id.clone().into_string(), ..Default::default() }), - type_url: type_url.to_string(), - resource_names_subscribe: vec![resource_id], - ..Default::default() - }) + .send( + DeltaDiscoveryRequestBuilder::for_resource(type_url) + .with_node_id(self.node.id.clone().to_string()) + .with_resource_names_subscribe(vec![resource_id]) + .build(), + ) .await { warn!("problems updating subscription: {:?}", err); @@ -319,12 +289,12 @@ impl DeltaClientBackgroundWorker { let was_subscribed = state.subscriptions.entry(type_url).or_default().remove(resource_id.as_str()); if was_subscribed { if let Err(err) = discovery_requests_tx - .send(DeltaDiscoveryRequest { - node: Some(EnvoyNode { id: self.node.id.clone().into_string(), ..Default::default() }), - type_url: type_url.to_string(), - resource_names_unsubscribe: vec![resource_id], - ..Default::default() - }) + .send( + DeltaDiscoveryRequestBuilder::for_resource(type_url) + .with_node_id(self.node.id.clone().to_string()) + .with_resource_names_unsubscribe(vec![resource_id]) + .build(), + ) .await { warn!("problems updating subscription: {:?}", err); @@ -333,7 +303,8 @@ impl DeltaClientBackgroundWorker { }, } } - async fn process_and_acknowledge( + + async fn process_discovery_response( &mut self, response: DeltaDiscoveryResponse, acknowledgments_tx: &mpsc::Sender, @@ -342,107 +313,124 @@ impl DeltaClientBackgroundWorker { let type_url = TypeUrl::try_from(response.type_url.as_str())?; let nonce = response.nonce.clone(); info!(type_url = type_url.to_string(), size = response.resources.len(), "received config resources from xDS"); - - let (updates, mut pending_update_versions, decoding_errors) = Self::map_updates(state, response, type_url); - - if decoding_errors.is_empty() { - let (internal_ack_tx, internal_ack_rx) = oneshot::channel::>(); - let notification = XdsUpdateEvent { updates, ack_channel: internal_ack_tx }; - self.resources_tx.send(notification).await.map_err(|e: mpsc::error::SendError| { - XdsError::InternalProcessingError(e.to_string()) - })?; - tokio::select! { - ack = internal_ack_rx => { - match ack { - Ok(rejected_configs) => { - let error = if rejected_configs.is_empty() { - debug!( - type_url = type_url.to_string(), - nonce, - "sending ack response after processing", - ); - let tracked_resources = state.tracked.entry(type_url).or_default(); - for (resource_id, resource_version) in pending_update_versions.drain() { - tracked_resources.insert(resource_id, resource_version); - } - None - } else { - let error = rejected_configs - .into_iter() - .map(|reject| reject.to_string()) - .collect::>() - .join("; "); - debug!( - type_url = type_url.to_string(), - error, - nonce, - "rejecting configs with nack response", - ); - Some(Status { - message: error, - code: tonic::Code::InvalidArgument.into(), - ..Default::default() - }) - }; - if let Err(err) = acknowledgments_tx.send(DeltaDiscoveryRequest { - type_url: type_url.to_string(), - response_nonce: nonce, - error_detail: error, - ..Default::default() - }).await { - warn!("error in send xDS ack/nack upstream {:?}", err); - } - }, - Err(err) => { - warn!("error in reading internal ack/nack {:?}", err); - }, + let for_removal = Self::process_resource_ids_for_removal(state, &response, type_url); + + match Self::decode_pending_updates(&response, type_url) { + Ok(mut decoded_updates) => { + let (internal_ack_tx, internal_ack_rx) = oneshot::channel::>(); + + let mut pending_update_versions = Self::extract_update_versions(&decoded_updates); + let mut removal_notifications = for_removal + .iter() + .map(|resource_id| XdsResourceUpdate::Remove(resource_id.to_string(), type_url)) + .collect::>(); + + let mut batched_updates = Vec::::new(); + batched_updates.append(&mut decoded_updates); + batched_updates.append(&mut removal_notifications); + let batch_notification = XdsUpdateEvent { updates: batched_updates, ack_channel: internal_ack_tx }; + self.resources_tx.send(batch_notification).await.map_err( + |e: mpsc::error::SendError| XdsError::InternalProcessingError(e.to_string()), + )?; + + tokio::select! { + ack = internal_ack_rx => { + match ack { + Ok(rejected_configs) => { + let maybe_error = if rejected_configs.is_empty() { + debug!(type_url = type_url.to_string(), nonce, "sending ack response after processing"); + let tracked_resources = state.tracked.entry(type_url).or_default(); + for (resource_id, resource_version) in pending_update_versions.drain() { + tracked_resources.insert(resource_id, resource_version); + } + None + } else { + let error_msg = rejected_configs.into_iter() + .map(|reject| reject.to_string()) + .collect::>() + .join("; "); + warn!(type_url = type_url.to_string(), error_msg, nonce, "rejecting configs with nack response"); + Some(StatusBuilder::invalid_argument().with_message(error_msg).build()) + }; + let upstream_response = DeltaDiscoveryRequestBuilder::for_resource(type_url) + .with_nounce(nonce.clone()) + .with_error_detail(maybe_error) + .build(); + if let Err(err) = acknowledgments_tx.send(upstream_response).await { + warn!("error in send xDS ack/nack upstream {:?}", err); + }; + }, + Err(err) => { + warn!("error in reading internal ack/nack {:?}", err); + }, + } + } + () = time::sleep(ACK_TIMEOUT) => { + warn!("timed out while waiting to acknowledge config updates"); + let version_info = pending_update_versions.into_keys() + .collect::>() + .join("; "); + let error_msg = format!("timed out trying to apply resource updates for [{version_info}]"); + let upstream_response = DeltaDiscoveryRequestBuilder::for_resource(type_url) + .with_nounce(nonce.clone()) + .with_error_detail(Some(StatusBuilder::unspecified_error().with_message(error_msg).build())) + .build(); + let _ = acknowledgments_tx.send(upstream_response).await; } } - () = time::sleep(ACK_TIMEOUT) => { - warn!("timed out while waiting to acknowledge config updates"); - let error = pending_update_versions.into_keys() - .collect::>() - .join("; "); - let _ = acknowledgments_tx.send(DeltaDiscoveryRequest { - type_url: type_url.to_string(), - response_nonce: nonce, - error_detail: Some(Status { - message: error, - ..Default::default() - }), - ..Default::default() - }).await; + }, + + Err(decoding_errors) => { + let error_msg = + decoding_errors.into_iter().map(|reject| reject.to_string()).collect::>().join("; "); + warn!( + type_url = type_url.to_string(), + error_msg, nonce, "decoding error, rejecting configs with nack response" + ); + let upstream_nack_response = DeltaDiscoveryRequestBuilder::for_resource(type_url) + .with_nounce(nonce) + .with_error_detail(Some(StatusBuilder::invalid_argument().with_message(error_msg).build())) + .build(); + if let Err(err) = acknowledgments_tx.send(upstream_nack_response).await { + warn!("error in send xDS ack/nack upstream {:?}", err); } - } - } else { - let error = - decoding_errors.into_iter().map(|reject| reject.to_string()).collect::>().join("; "); - if let Err(err) = acknowledgments_tx - .send(DeltaDiscoveryRequest { - type_url: type_url.to_string(), - response_nonce: nonce, - error_detail: Some(Status { - message: error, - code: tonic::Code::InvalidArgument.into(), - ..Default::default() - }), - ..Default::default() - }) - .await - { - warn!("error in send xDS ack/nack upstream {:?}", err); - } + }, } - Ok(()) } - fn map_updates( + fn build_initial_discovery_requests(&self, tracking_state: &DiscoveryClientState) -> Vec { + let resource_types = match C::type_url() { + Some(type_url) => vec![type_url], + _ => vec![ + TypeUrl::Secret, + TypeUrl::ClusterLoadAssignment, + TypeUrl::Cluster, + TypeUrl::RouteConfiguration, + TypeUrl::Listener, + ], + }; + resource_types + .iter() + .map(|resource_type| { + let subscriptions = tracking_state.subscriptions.get(resource_type).cloned().unwrap_or_default(); + let already_tracked: HashMap = + tracking_state.tracked.get(resource_type).cloned().unwrap_or_default(); + DeltaDiscoveryRequestBuilder::for_resource(resource_type.to_owned()) + .with_node_id(self.node.id.clone().to_string()) + .with_initial_resource_versions(already_tracked) + .with_resource_names_subscribe(subscriptions.into_iter().collect()) + .build() + }) + .collect() + } + + fn process_resource_ids_for_removal( state: &mut DiscoveryClientState, - response: DeltaDiscoveryResponse, + response: &DeltaDiscoveryResponse, type_url: TypeUrl, - ) -> (Vec, HashMap, Vec) { - let for_removal: Vec = response + ) -> Vec { + response .removed_resources .iter() .map(|resource_id| { @@ -452,13 +440,17 @@ impl DeltaClientBackgroundWorker { } resource_id.clone() }) - .collect(); + .collect() + } - let mut pending_update_versions = HashMap::::new(); + fn decode_pending_updates( + response: &DeltaDiscoveryResponse, + type_url: TypeUrl, + ) -> Result, Vec> { let mut decoding_errors = Vec::::new(); - - let updates = response + let decoded_updates = response .resources + .clone() .into_iter() .filter_map(|resource| { let resource_id = resource.name.clone(); @@ -473,15 +465,24 @@ impl DeltaClientBackgroundWorker { let decoding_error: orion_error::Error = error_msg.clone().into(); decoding_errors.push(RejectedConfig::from((resource_id.clone(), decoding_error))); warn!(error_msg); - } else { - pending_update_versions.insert(resource_id.clone(), resource_version); - debug!("decoded config update for resource {resource_id}"); } - decoded.ok().map(|value| XdsResourceUpdate::Update(resource_id.clone(), value)) + decoded.ok().map(|value| XdsResourceUpdate::Update(resource_id, value, resource_version)) }) - .chain(for_removal.into_iter().map(|resource_id| XdsResourceUpdate::Remove(resource_id, type_url))) .collect(); + if decoding_errors.is_empty() { + Ok(decoded_updates) + } else { + Err(decoding_errors) + } + } - (updates, pending_update_versions, decoding_errors) + fn extract_update_versions(updates: &[XdsResourceUpdate]) -> HashMap { + let mut update_versions = HashMap::::new(); + for update in updates { + if let XdsResourceUpdate::Update(resource_id, _, resource_version) = update { + update_versions.insert(resource_id.to_string(), resource_version.to_string()); + } + } + update_versions } } diff --git a/orion-xds/src/xds/mod.rs b/orion-xds/src/xds/mod.rs index 94b02b44..b3b74881 100644 --- a/orion-xds/src/xds/mod.rs +++ b/orion-xds/src/xds/mod.rs @@ -22,5 +22,6 @@ pub mod bindings; pub mod client; pub mod model; +mod request; pub mod resources; pub mod server; diff --git a/orion-xds/src/xds/model.rs b/orion-xds/src/xds/model.rs index 5a4e9404..9b3348b9 100644 --- a/orion-xds/src/xds/model.rs +++ b/orion-xds/src/xds/model.rs @@ -18,6 +18,7 @@ // // +use core::result::Result::Err; use orion_configuration::config::{ cluster::Cluster, cluster::ClusterLoadAssignment, common::GenericError, listener::Listener, network_filters::http_connection_manager::RouteConfiguration, secret::Secret, @@ -48,14 +49,14 @@ pub type ResourceVersion = String; #[derive(Debug)] pub enum XdsResourceUpdate { - Update(ResourceId, XdsResourcePayload), + Update(ResourceId, XdsResourcePayload, ResourceVersion), Remove(ResourceId, TypeUrl), } impl XdsResourceUpdate { pub fn id(&self) -> ResourceId { match self { - XdsResourceUpdate::Update(id, _) | XdsResourceUpdate::Remove(id, _) => id.to_string(), + XdsResourceUpdate::Update(id, _, _) | XdsResourceUpdate::Remove(id, _) => id.to_string(), } } } diff --git a/orion-xds/src/xds/request.rs b/orion-xds/src/xds/request.rs new file mode 100644 index 00000000..c6fb3953 --- /dev/null +++ b/orion-xds/src/xds/request.rs @@ -0,0 +1,105 @@ +use super::model::{ResourceId, ResourceVersion, TypeUrl}; +use ng2_data_plane_api::envoy_data_plane_api::{ + envoy::{config::core::v3::Node as EnvoyNode, service::discovery::v3::DeltaDiscoveryRequest}, + google::rpc::Status, + tonic, +}; +use std::collections::HashMap; + +pub struct StatusBuilder { + code: i32, + message: String, +} + +#[allow(dead_code)] +impl StatusBuilder { + pub fn unspecified_error() -> Self { + StatusBuilder { code: tonic::Code::Unknown.into(), message: String::new() } + } + + pub fn invalid_argument() -> Self { + StatusBuilder { code: tonic::Code::InvalidArgument.into(), message: String::new() } + } + + pub fn with_message(mut self, message: String) -> Self { + self.message = message; + self + } + + pub fn build(self) -> Status { + Status { message: self.message, code: self.code, ..Default::default() } + } +} + +pub struct DeltaDiscoveryRequestBuilder { + node: Option, + nounce: Option, + type_url: TypeUrl, + error_detail: Option, + resource_names_subscribe: Vec, + resource_names_unsubscribe: Vec, + initial_resource_versions: HashMap, +} + +#[allow(dead_code)] +impl DeltaDiscoveryRequestBuilder { + pub fn for_resource(type_url: TypeUrl) -> Self { + DeltaDiscoveryRequestBuilder { + node: None, + nounce: None, + type_url, + error_detail: None, + resource_names_subscribe: Vec::new(), + resource_names_unsubscribe: Vec::new(), + initial_resource_versions: HashMap::new(), + } + } + + pub fn with_node_id(mut self, node_id: String) -> Self { + self.node = Some(node_id); + self + } + + pub fn with_nounce(mut self, nounce: String) -> Self { + self.nounce = Some(nounce); + self + } + + pub fn with_resource_names_subscribe(mut self, resource_names: Vec) -> Self { + self.resource_names_subscribe = resource_names; + self + } + + pub fn with_resource_names_unsubscribe(mut self, resource_names: Vec) -> Self { + self.resource_names_unsubscribe = resource_names; + self + } + + pub fn with_initial_resource_versions( + mut self, + initial_resource_versions: HashMap, + ) -> Self { + self.initial_resource_versions = initial_resource_versions; + self + } + + pub fn with_error_detail(mut self, error_detail: Option) -> Self { + self.error_detail = error_detail; + self + } + + pub fn build(self) -> DeltaDiscoveryRequest { + let node_id = self.node.unwrap_or_default(); + let nounce = self.nounce.unwrap_or_default(); + DeltaDiscoveryRequest { + node: Some(EnvoyNode { id: node_id, ..Default::default() }), + response_nonce: nounce, + type_url: self.type_url.to_string(), + resource_names_subscribe: self.resource_names_subscribe, + resource_names_unsubscribe: self.resource_names_unsubscribe, + initial_resource_versions: self.initial_resource_versions, + error_detail: self.error_detail, + ..Default::default() + } + } +} From d8c28eb2cbc559e1c47e69efa88b150ab63a8b74 Mon Sep 17 00:00:00 2001 From: Nicola Bonelli Date: Mon, 12 May 2025 12:11:47 +0200 Subject: [PATCH 08/64] Add orion-format module with Envoy grammar and token parsing Includes: - EnvoyGrammar for parsing Envoy-style log templates - Token definitions and argument handling - Unit tests for various parsing scenarios Signed-off-by: Nicola Bonelli --- Cargo.lock | 46 ++++ Cargo.toml | 4 +- orion-format/Cargo.toml | 15 ++ orion-format/src/context.rs | 0 orion-format/src/envoy.rs | 431 ++++++++++++++++++++++++++++++++++++ orion-format/src/lib.rs | 96 ++++++++ orion-format/src/token.rs | 149 +++++++++++++ 7 files changed, 740 insertions(+), 1 deletion(-) create mode 100644 orion-format/Cargo.toml create mode 100644 orion-format/src/context.rs create mode 100644 orion-format/src/envoy.rs create mode 100644 orion-format/src/lib.rs create mode 100644 orion-format/src/token.rs diff --git a/Cargo.lock b/Cargo.lock index c15d9778..f667a54c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1681,6 +1681,18 @@ dependencies = [ name = "orion-error" version = "0.1.0" +[[package]] +name = "orion-format" +version = "0.1.0" +dependencies = [ + "http", + "lazy_static", + "ptrie", + "strum", + "strum_macros", + "thiserror", +] + [[package]] name = "orion-lib" version = "0.1.0" @@ -2008,6 +2020,18 @@ dependencies = [ "prost", ] +[[package]] +name = "ptrie" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4debfffbf1669869b4bea1bab297f8e9ec11900a9ff2dcf659c2a1c4918c811a" + +[[package]] +name = "quick-error" +version = "1.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" + [[package]] name = "quote" version = "1.0.40" @@ -2509,6 +2533,28 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "strum" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f64def088c51c9510a8579e3c5d67c65349dcf755e5479ad3d010aa6454e2c32" +dependencies = [ + "strum_macros", +] + +[[package]] +name = "strum_macros" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c77a8c5abcaf0f9ce05d62342b7d298c346515365c36b673df4ebe3ced01fde8" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "rustversion", + "syn", +] + [[package]] name = "subtle" version = "2.6.1" diff --git a/Cargo.toml b/Cargo.toml index 32601b24..75902421 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "orion-error", "orion-lib", "orion-proxy", + "orion-format", "orion-xds", ] resolver = "2" @@ -15,6 +16,7 @@ envoy-data-plane-api = { path = "envoy-data-plane-api" } orion-configuration = { path = "orion-configuration" } orion-data-plane-api = { path = "orion-data-plane-api" } orion-error = { path = "orion-error" } +orion-format = { path = "orion-format" } orion-lib = { path = "orion-lib" } orion-xds = { path = "orion-xds" } @@ -93,4 +95,4 @@ inconsistent_struct_constructor = "allow" semicolon-if-nothing-returned = "allow" missing_errors_doc = "allow" must_use_candidate = "allow" -doc_markdown = "allow" \ No newline at end of file +doc_markdown = "allow" diff --git a/orion-format/Cargo.toml b/orion-format/Cargo.toml new file mode 100644 index 00000000..d080840a --- /dev/null +++ b/orion-format/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "orion-format" +version = "0.1.0" +edition = "2021" + +[lints] +workspace = true + +[dependencies] +http.workspace = true +lazy_static = "1.5.0" +ptrie = "0.7.2" +strum = { version = "0.27.1", features = ["derive"] } +strum_macros = "0.27.1" +thiserror.workspace = true diff --git a/orion-format/src/context.rs b/orion-format/src/context.rs new file mode 100644 index 00000000..e69de29b diff --git a/orion-format/src/envoy.rs b/orion-format/src/envoy.rs new file mode 100644 index 00000000..a6ab1e27 --- /dev/null +++ b/orion-format/src/envoy.rs @@ -0,0 +1,431 @@ +use crate::token::{ReqArg, RespArg, Token, TokenArgument}; +use crate::{FormatError, Grammar, Template}; +use http::HeaderName; +use lazy_static::lazy_static; +use ptrie::Trie; + +macro_rules! trie_mapstr { + ($trie:expr, $lit:literal, $tok:expr) => { + $trie.insert($lit.bytes(), ($tok, $lit.len(), false)); // expand the variadic arguments into the tuple. + }; + ($trie:expr, $lit:literal, $tok:expr, $arg:expr ) => { + $trie.insert($lit.bytes(), ($tok, $lit.len(), $arg)); // expand the variadic arguments into the tuple. + }; +} + +lazy_static! { + static ref ENVOY_REQ_ARGS: Trie = { + let mut trie = Trie::new(); + trie_mapstr!(trie, ":SCHEME",ReqArg::Scheme); + trie_mapstr!(trie, ":METHOD",ReqArg::Method); + trie_mapstr!(trie, ":PATH", ReqArg::Path); + trie_mapstr!(trie, ":AUTHORITY",ReqArg::Authority); + trie + }; + + static ref ENVOY_RESP_ARGS: Trie = { + let trie = Trie::new(); + trie + }; + + static ref ENVOY_PATTERNS: Trie = { + let mut trie = Trie::new(); + trie_mapstr!(trie, "REQUEST_DURATION", Token::RequestDuration); + trie_mapstr!(trie, "REQUEST_TX_DURATION", Token::RequestTxDuration); + trie_mapstr!(trie, "RESPONSE_DURATION", Token::ResponseDuration); + trie_mapstr!(trie, "RESPONSE_TX_DURATION", Token::ResponseTxDuration); + trie_mapstr!(trie, "DOWNSTREAM_HANDSHAKE_DURATION", Token::DownstreamHandshakeDuration); + trie_mapstr!(trie, "ROUNDTRIP_DURATION", Token::RoundtripDuration); + trie_mapstr!(trie, "BYTES_RECEIVED", Token::BytesReceived); + trie_mapstr!(trie, "BYTES_RETRANSMITTED", Token::BytesRetransmitted); + trie_mapstr!(trie, "PACKETS_RETRANSMITTED", Token::PacketsRetransmitted); + trie_mapstr!(trie, "UPSTREAM_WIRE_BYTES_RECEIVED", Token::UpstreamWireBytesReceived); + trie_mapstr!(trie, "UPSTREAM_HEADER_BYTES_RECEIVED", Token::UpstreamHeaderBytesReceived); + trie_mapstr!(trie, "DOWNSTREAM_WIRE_BYTES_RECEIVED", Token::DownstreamWireBytesReceived); + trie_mapstr!(trie, "DOWNSTREAM_HEADER_BYTES_RECEIVED", Token::DownstreamHeaderBytesReceived); + trie_mapstr!(trie, "PROTOCOL", Token::Protocol); + trie_mapstr!(trie, "UPSTREAM_PROTOCOL", Token::UpstreamProtocol); + trie_mapstr!(trie, "RESPONSE_CODE", Token::ResponseCode); + trie_mapstr!(trie, "RESPONSE_CODE_DETAILS", Token::ResponseCodeDetails); + trie_mapstr!(trie, "CONNECTION_TERMINATION_DETAILS", Token::ConnectionTerminationDetails); + trie_mapstr!(trie, "BYTES_SENT", Token::BytesSent); + trie_mapstr!(trie, "UPSTREAM_WIRE_BYTES_SENT", Token::UpstreamWireBytesSent); + trie_mapstr!(trie, "UPSTREAM_HEADER_BYTES_SENT", Token::UpstreamHeaderBytesSent); + trie_mapstr!(trie, "DOWNSTREAM_WIRE_BYTES_SENT", Token::DownstreamWireBytesSent); + trie_mapstr!(trie, "DOWNSTREAM_HEADER_BYTES_SENT", Token::DownstreamHeaderBytesSent); + trie_mapstr!(trie, "DURATION", Token::Duration); + trie_mapstr!(trie, "COMMON_DURATION", Token::CommonDuration); + trie_mapstr!(trie, "CUSTOM_FLAGS", Token::CustomFlags); + trie_mapstr!(trie, "RESPONSE_FLAGS", Token::ResponseFlags); + trie_mapstr!(trie, "RESPONSE_FLAGS_LONG", Token::ResponseFlagsLong); + trie_mapstr!(trie, "UPSTREAM_HOST_NAME", Token::UpstreamHostName); + trie_mapstr!(trie, "UPSTREAM_HOST_NAME_WITHOUT_PORT", Token::UpstreamHostNameWithoutPort); + trie_mapstr!(trie, "UPSTREAM_HOST", Token::UpstreamHost); + trie_mapstr!(trie, "UPSTREAM_CONNECTION_ID", Token::UpstreamConnectionId); + trie_mapstr!(trie, "UPSTREAM_CLUSTER", Token::UpstreamCluster); + trie_mapstr!(trie, "UPSTREAM_CLUSTER_RAW", Token::UpstreamClusterRaw); + trie_mapstr!(trie, "UPSTREAM_LOCAL_ADDRESS", Token::UpstreamLocalAddress); + trie_mapstr!(trie, "UPSTREAM_LOCAL_ADDRESS_WITHOUT_PORT", Token::UpstreamLocalAddressWithoutPort); + trie_mapstr!(trie, "UPSTREAM_LOCAL_PORT", Token::UpstreamLocalPort); + trie_mapstr!(trie, "UPSTREAM_REMOTE_ADDRESS", Token::UpstreamRemoteAddress); + trie_mapstr!(trie, "UPSTREAM_REMOTE_ADDRESS_WITHOUT_PORT", Token::UpstreamRemoteAddressWithoutPort); + trie_mapstr!(trie, "UPSTREAM_REMOTE_PORT", Token::UpstreamRemotePort); + trie_mapstr!(trie, "UPSTREAM_REQUEST_ATTEMPT_COUNT", Token::UpstreamRequestAttemptCount); + trie_mapstr!(trie, "UPSTREAM_TLS_CIPHER", Token::UpstreamTlsCipher); + trie_mapstr!(trie, "UPSTREAM_TLS_VERSION", Token::UpstreamTlsVersion); + trie_mapstr!(trie, "UPSTREAM_TLS_SESSION_ID", Token::UpstreamTlsSessionId); + trie_mapstr!(trie, "UPSTREAM_PEER_ISSUER", Token::UpstreamPeerIssuer); + trie_mapstr!(trie, "UPSTREAM_PEER_CERT", Token::UpstreamPeerCert); + trie_mapstr!(trie, "UPSTREAM_PEER_SUBJECT", Token::UpstreamPeerSubject); + trie_mapstr!(trie, "DOWNSTREAM_LOCAL_ADDRESS", Token::DownstreamLocalAddress); + trie_mapstr!(trie, "DOWNSTREAM_DIRECT_LOCAL_ADDRESS", Token::DownstreamDirectLocalAddress); + trie_mapstr!(trie, "DOWNSTREAM_LOCAL_ADDRESS_WITHOUT_PORT", Token::DownstreamLocalAddressWithoutPort); + trie_mapstr!(trie, "DOWNSTREAM_DIRECT_LOCAL_ADDRESS_WITHOUT_PORT", Token::DownstreamDirectLocalAddressWithoutPort); + trie_mapstr!(trie, "DOWNSTREAM_LOCAL_PORT", Token::DownstreamLocalPort); + trie_mapstr!(trie, "DOWNSTREAM_DIRECT_LOCAL_PORT", Token::DownstreamDirectLocalPort); + trie_mapstr!(trie, "DOWNSTREAM_REMOTE_ADDRESS", Token::DownstreamRemoteAddress); + trie_mapstr!(trie, "DOWNSTREAM_REMOTE_ADDRESS_WITHOUT_PORT", Token::DownstreamRemoteAddressWithoutPort); + trie_mapstr!(trie, "DOWNSTREAM_REMOTE_PORT", Token::DownstreamRemotePort); + trie_mapstr!(trie, "DOWNSTREAM_DIRECT_REMOTE_ADDRESS", Token::DownstreamDirectRemoteAddress); + trie_mapstr!(trie, "DOWNSTREAM_DIRECT_REMOTE_ADDRESS_WITHOUT_PORT", Token::DownstreamDirectRemoteAddressWithoutPort); + trie_mapstr!(trie, "DOWNSTREAM_DIRECT_REMOTE_PORT", Token::DownstreamDirectRemotePort); + trie_mapstr!(trie, "CONNECTION_ID", Token::ConnectionId); + trie_mapstr!(trie, "REQUESTED_SERVER_NAME", Token::RequestedServerName); + trie_mapstr!(trie, "ROUTE_NAME", Token::RouteName); + trie_mapstr!(trie, "UPSTREAM_PEER_URI_SAN", Token::UpstreamPeerUriSan); + trie_mapstr!(trie, "UPSTREAM_PEER_DNS_SAN", Token::UpstreamPeerDnsSan); + trie_mapstr!(trie, "UPSTREAM_PEER_IP_SAN", Token::UpstreamPeerIpSan); + trie_mapstr!(trie, "UPSTREAM_LOCAL_URI_SAN", Token::UpstreamLocalUriSan); + trie_mapstr!(trie, "UPSTREAM_LOCAL_DNS_SAN", Token::UpstreamLocalDnsSan); + trie_mapstr!(trie, "UPSTREAM_LOCAL_IP_SAN", Token::UpstreamLocalIpSan); + trie_mapstr!(trie, "DOWNSTREAM_PEER_URI_SAN", Token::DownstreamPeerUriSan); + trie_mapstr!(trie, "DOWNSTREAM_PEER_DNS_SAN", Token::DownstreamPeerDnsSan); + trie_mapstr!(trie, "DOWNSTREAM_PEER_IP_SAN", Token::DownstreamPeerIpSan); + trie_mapstr!(trie, "DOWNSTREAM_PEER_EMAIL_SAN", Token::DownstreamPeerEmailSan); + trie_mapstr!(trie, "DOWNSTREAM_PEER_OTHERNAME_SAN", Token::DownstreamPeerOthernameSan); + trie_mapstr!(trie, "DOWNSTREAM_LOCAL_URI_SAN", Token::DownstreamLocalUriSan); + trie_mapstr!(trie, "DOWNSTREAM_LOCAL_DNS_SAN", Token::DownstreamLocalDnsSan); + trie_mapstr!(trie, "DOWNSTREAM_LOCAL_IP_SAN", Token::DownstreamLocalIpSan); + trie_mapstr!(trie, "DOWNSTREAM_LOCAL_EMAIL_SAN", Token::DownstreamLocalEmailSan); + trie_mapstr!(trie, "DOWNSTREAM_LOCAL_OTHERNAME_SAN", Token::DownstreamLocalOthernameSan); + trie_mapstr!(trie, "DOWNSTREAM_PEER_SUBJECT", Token::DownstreamPeerSubject); + trie_mapstr!(trie, "DOWNSTREAM_LOCAL_SUBJECT", Token::DownstreamLocalSubject); + trie_mapstr!(trie, "DOWNSTREAM_TLS_SESSION_ID", Token::DownstreamTlsSessionId); + trie_mapstr!(trie, "DOWNSTREAM_TLS_CIPHER", Token::DownstreamTlsCipher); + trie_mapstr!(trie, "DOWNSTREAM_TLS_VERSION", Token::DownstreamTlsVersion); + trie_mapstr!(trie, "DOWNSTREAM_PEER_FINGERPRINT_256", Token::DownstreamPeerFingerprint256); + trie_mapstr!(trie, "DOWNSTREAM_PEER_FINGERPRINT_1", Token::DownstreamPeerFingerprint1); + trie_mapstr!(trie, "DOWNSTREAM_PEER_SERIAL", Token::DownstreamPeerSerial); + trie_mapstr!(trie, "DOWNSTREAM_PEER_CHAIN_FINGERPRINTS_256", Token::DownstreamPeerChainFingerprints256); + trie_mapstr!(trie, "DOWNSTREAM_PEER_CHAIN_FINGERPRINTS_1", Token::DownstreamPeerChainFingerprints1); + trie_mapstr!(trie, "DOWNSTREAM_PEER_CHAIN_SERIALS", Token::DownstreamPeerChainSerials); + trie_mapstr!(trie, "DOWNSTREAM_PEER_ISSUER", Token::DownstreamPeerIssuer); + trie_mapstr!(trie, "DOWNSTREAM_PEER_CERT", Token::DownstreamPeerCert); + trie_mapstr!(trie, "DOWNSTREAM_TRANSPORT_FAILURE_REASON", Token::DownstreamTransportFailureReason); + trie_mapstr!(trie, "UPSTREAM_TRANSPORT_FAILURE_REASON", Token::UpstreamTransportFailureReason); + trie_mapstr!(trie, "HOSTNAME", Token::Hostname); + trie_mapstr!(trie, "FILTER_CHAIN_NAME", Token::FilterChainName); + trie_mapstr!(trie, "VIRTUAL_CLUSTER_NAME", Token::VirtualClusterName); + trie_mapstr!(trie, "TLS_JA3_FINGERPRINT", Token::TlsJa3Fingerprint); + trie_mapstr!(trie, "UNIQUE_ID", Token::UniqueId); + trie_mapstr!(trie, "STREAM_ID", Token::StreamId); + trie_mapstr!(trie, "START_TIME", Token::StartTime); + trie_mapstr!(trie, "START_TIME_LOCAL", Token::StartTimeLocal); + trie_mapstr!(trie, "EMIT_TIME", Token::EmitTime); + trie_mapstr!(trie, "EMIT_TIME_LOCAL", Token::EmitTimeLocal); + trie_mapstr!(trie, "DYNAMIC_METADATA", Token::DynamicMetadata); + trie_mapstr!(trie, "CLUSTER_METADATA", Token::ClusterMetadata); + trie_mapstr!(trie, "UPSTREAM_METADATA", Token::UpstreamMetadata); + trie_mapstr!(trie, "FILTER_STATE", Token::FilterState); + trie_mapstr!(trie, "UPSTREAM_FILTER_STATE", Token::UpstreamFilterState); + trie_mapstr!(trie, "DOWNSTREAM_PEER_CERT_V_START", Token::DownstreamPeerCertVStart); + trie_mapstr!(trie, "DOWNSTREAM_PEER_CERT_V_END", Token::DownstreamPeerCertVEnd); + trie_mapstr!(trie, "UPSTREAM_PEER_CERT_V_START", Token::UpstreamPeerCertVStart); + trie_mapstr!(trie, "UPSTREAM_PEER_CERT_V_END", Token::UpstreamPeerCertVEnd); + trie_mapstr!(trie, "ENVIRONMENT", Token::Environment); + trie_mapstr!(trie, "UPSTREAM_CONNECTION_POOL_READY_DURATION", Token::UpstreamConnectionPoolReadyDuration); + trie_mapstr!(trie, "REQ", Token::Request, true); // %REQ(USER-AGENT)% + trie_mapstr!(trie, "RESP", Token::Response, true); // %RESP(X-ENVOY-UPSTREAM-SERVICE-TIME)% + trie + }; +} + +pub struct EnvoyGrammar; + +impl EnvoyGrammar { + fn parse_request(arg: &str) -> Result { + match ENVOY_REQ_ARGS.find_longest_prefix(arg.bytes()) { + Some((t, _, _)) => Ok(t.clone()), + None => { + let name = + HeaderName::from_bytes(arg.as_bytes()).map_err(|_| FormatError::InvalidRequestArg(arg.into()))?; + Ok(ReqArg::Header(name)) + }, + } + } + + fn parse_response(arg: &str) -> Result { + match ENVOY_RESP_ARGS.find_longest_prefix(arg.bytes()) { + Some((t, _, _)) => Ok(t.clone()), + None => { + let name = + HeaderName::from_bytes(arg.as_bytes()).map_err(|_| FormatError::InvalidResponseArg(arg.into()))?; + Ok(RespArg::Header(name)) + }, + } + } + + fn extract_token_arg(input: &str) -> Result<(&str, usize), FormatError> { + if let Some(rest) = input.strip_prefix('(') { + if let Some(end) = rest.find(')') { + let arg = &rest[..end]; + if arg.is_empty() { + return Err(FormatError::EmptyArgument(input.into())); + } + let total_len = end + 2; // '(' + arg.len() + ')' + return Ok((arg, total_len)); + } + } + Err(FormatError::MissingBracket(input.into())) + } +} + +impl Grammar for EnvoyGrammar { + fn parse(input: &str) -> Result, FormatError> { + let mut parts = Vec::new(); + let mut literal_start = 0; // Indice di inizio del literal corrente + let mut i = 0; + + while i < input.len() { + let mut longest_placeholder: Option<(Token, usize, Option)> = None; + let mut skip = None; + + // finst the longest placeholder starting from the current index i + // + if input[i..].starts_with("%") { + let remainder = &input[i + 1..]; + if remainder.starts_with("%") { + skip = Some(2); + } else if let Some((placeholder, placeholder_len, has_arg)) = + ENVOY_PATTERNS.find_longest_prefix(remainder.bytes()) + { + let after_placeholder = &remainder[*placeholder_len..]; + // placeholder found + if *has_arg { + let (arg_value, arg_len) = Self::extract_token_arg(after_placeholder)?; + + if longest_placeholder.is_none() || *placeholder_len > longest_placeholder.as_ref().unwrap().1 { + match placeholder { + Token::Request => { + let arg = Self::parse_request(&arg_value)?; + longest_placeholder = + Some((*placeholder, *placeholder_len, Some(TokenArgument::Request(arg)))); + }, + Token::Response => { + let arg = Self::parse_response(&arg_value)?; + longest_placeholder = + Some((*placeholder, *placeholder_len, Some(TokenArgument::Response(arg)))); + }, + _ => (), + } + } + + if !after_placeholder[arg_len..].starts_with("%") { + return Err(FormatError::MissingDelimiter(input[i + *placeholder_len..].into())); + } + + skip = Some(2 + *placeholder_len + arg_len); + } else { + longest_placeholder = Some((*placeholder, *placeholder_len, None)); + if !after_placeholder.starts_with("%") { + return Err(FormatError::MissingDelimiter(input[i + *placeholder_len..].into())); + } + skip = Some(2 + *placeholder_len); + } + } else { + return Err(FormatError::UnsupportedPattern(input[i..].into())); + } + } + + if let Some(placeholder) = longest_placeholder.as_ref() { + // placeholder found + if i > literal_start { + let literal_text = &input[literal_start..i]; + parts.push(Template::Literal(literal_text.to_string())); + } + + // Add this placeholder. + parts.push(Template::Placeholder(placeholder.0.into(), placeholder.2.clone())); + + // advnace the index beyond the current placeholder and possibly its argument. + i += skip.unwrap(); + + literal_start = i; + } else { + /* skip the specified number of bytes, or by default the next char */ + i += skip.unwrap_or(input[i..].chars().next().map(|c| c.len_utf8()).unwrap_or(0)); + } + } + + // if there's some text remaning, it's a literal + if i > literal_start { + let literal_text = &input[literal_start..i]; + + parts.push(Template::Literal(literal_text.replace("%%", "%"))); + } + + Ok(parts) + } +} + +// Unit tests module +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_only_literals() { + let input = "This is a plain literal string."; + let expected = vec![Template::Literal("This is a plain literal string.".into())]; + let actual = EnvoyGrammar::parse(input).unwrap(); + assert_eq!(actual, expected); + } + + #[test] + fn test_parse_only_placeholders() { + let input = "%START_TIME%%PROTOCOL%"; + let expected = + vec![Template::Placeholder(Token::StartTime, None), Template::Placeholder(Token::Protocol, None)]; + let actual = EnvoyGrammar::parse(input).unwrap(); + assert_eq!(actual, expected); + } + + #[test] + fn test_parse_mixed_literal_and_placeholders() { + let input = "Start %REQ(:METHOD)% middle %PROTOCOL% end."; + let expected = vec![ + Template::Literal("Start ".into()), + Template::Placeholder(Token::Request, Some(TokenArgument::Request(ReqArg::Method))), + Template::Literal(" middle ".into()), + Template::Placeholder(Token::Protocol, None), + Template::Literal(" end.".into()), + ]; + let actual = EnvoyGrammar::parse(input).unwrap(); + assert_eq!(actual, expected); + } + + #[test] + fn test_parse_starts_with_placeholder() { + let input = "%START_TIME% literal after."; + let expected = vec![Template::Placeholder(Token::StartTime, None), Template::Literal(" literal after.".into())]; + let actual = EnvoyGrammar::parse(input).unwrap(); + assert_eq!(actual, expected); + } + + #[test] + fn test_parse_ends_with_placeholder() { + let input = "Literal before %PROTOCOL%"; + let expected = vec![Template::Literal("Literal before ".into()), Template::Placeholder(Token::Protocol, None)]; + let actual = EnvoyGrammar::parse(input).unwrap(); + assert_eq!(actual, expected); + } + + #[test] + fn test_parse_empty_string() { + let input = ""; + let expected: Vec