diff --git a/src/common/http/server_auth.rs b/src/common/http/server_auth.rs index 7d482796f99f3..e152d78fd70bb 100644 --- a/src/common/http/server_auth.rs +++ b/src/common/http/server_auth.rs @@ -12,6 +12,7 @@ use vector_config::configurable_component; use vector_lib::{ TimeZone, compile_vrl, event::{Event, LogEvent, VrlTarget}, + lookup::OwnedTargetPath, sensitive_string::SensitiveString, }; use vector_vrl_metrics::MetricsStorage; @@ -59,6 +60,16 @@ pub enum HttpServerAuthConfig { /// The VRL boolean expression. source: String, }, + + /// Custom authentication using VRL code, with event enrichment. + /// + /// Like `custom`, validates the request using a VRL boolean expression. + /// Additionally, metadata fields written via `%field = value` in the VRL + /// program are extracted and injected into every authenticated event. + CustomEnriching { + /// The VRL boolean expression. May write `%field = value` to enrich events. + source: String, + }, } // Custom deserializer implementation to default `strategy` to `basic` @@ -120,7 +131,16 @@ impl<'de> Deserialize<'de> for HttpServerAuthConfig { .ok_or_else(|| Error::missing_field("source"))?; Ok(HttpServerAuthConfig::Custom { source }) } - _ => Err(Error::unknown_variant(strategy, &["basic", "custom"])), + "custom_enriching" => { + let source = fields + .remove("source") + .ok_or_else(|| Error::missing_field("source"))?; + Ok(HttpServerAuthConfig::CustomEnriching { source }) + } + _ => Err(Error::unknown_variant( + strategy, + &["basic", "custom", "custom_enriching"], + )), } } } @@ -171,6 +191,34 @@ impl HttpServerAuthConfig { Ok(HttpServerAuthMatcher::Vrl { program }) } + HttpServerAuthConfig::CustomEnriching { source } => { + let state = TypeState::default(); + + let mut config = CompileConfig::default(); + config.set_custom(enrichment_tables.clone()); + config.set_custom(metrics_storage.clone()); + // Lock the event body (.field) as read-only, but leave metadata (%field) writable + // so the VRL program can enrich authenticated events via %field = value. + config.set_read_only_path(OwnedTargetPath::event_root(), true); + + let CompilationResult { + program, + warnings, + config: _, + } = compile_vrl(source, &vector_vrl_functions::all(), &state, config) + .map_err(|diagnostics| format_vrl_diagnostics(source, diagnostics))?; + + if !program.final_type_info().result.is_boolean() { + return Err("VRL conditions must return a boolean.".into()); + } + + if !warnings.is_empty() { + let warnings = format_vrl_diagnostics(source, warnings); + warn!(message = "VRL compilation warning.", %warnings); + } + + Ok(HttpServerAuthMatcher::VrlEnriching { program }) + } } } } @@ -187,21 +235,28 @@ pub enum HttpServerAuthMatcher { /// Compiled VRL script program: Program, }, + /// Like `Vrl`, but metadata (`%field`) writes in the program are extracted + /// and returned to the caller for injection into authenticated events. + VrlEnriching { + /// Compiled VRL script + program: Program, + }, } impl HttpServerAuthMatcher { - /// Compares passed headers to the matcher + /// Validates the request. Returns `Ok(Some(enrichment))` for `VrlEnriching` when auth passes + /// and the VRL program wrote `%field` values; returns `Ok(None)` for all other matchers. pub fn handle_auth( &self, address: Option<&SocketAddr>, headers: &HeaderMap, path: &str, - ) -> Result<(), ErrorMessage> { + ) -> Result, ErrorMessage> { match self { HttpServerAuthMatcher::AuthHeader(expected, err_message) => { if let Some(header) = headers.get(AUTHORIZATION) { if expected == header { - Ok(()) + Ok(None) } else { Err(ErrorMessage::new( StatusCode::UNAUTHORIZED, @@ -216,7 +271,11 @@ impl HttpServerAuthMatcher { } } HttpServerAuthMatcher::Vrl { program } => { - self.handle_vrl_auth(address, headers, path, program) + self.handle_vrl_auth(address, headers, path, program)?; + Ok(None) + } + HttpServerAuthMatcher::VrlEnriching { program } => { + self.handle_vrl_enriching_auth(address, headers, path, program) } } } @@ -279,6 +338,71 @@ impl HttpServerAuthMatcher { )), } } + + fn handle_vrl_enriching_auth( + &self, + address: Option<&SocketAddr>, + headers: &HeaderMap, + path: &str, + program: &Program, + ) -> Result, ErrorMessage> { + let mut target = VrlTarget::new( + Event::Log(LogEvent::from_map( + ObjectMap::from([ + ( + "headers".into(), + Value::Object( + headers + .iter() + .map(|(k, v)| { + ( + KeyString::from(k.to_string()), + Value::Bytes(Bytes::copy_from_slice(v.as_bytes())), + ) + }) + .collect::(), + ), + ), + ( + "address".into(), + address.map_or(Value::Null, |a| Value::from(a.ip().to_string())), + ), + ("path".into(), Value::from(path.to_owned())), + ]), + Default::default(), + )), + program.info(), + false, + ); + let timezone = TimeZone::default(); + + let result = Runtime::default().resolve(&mut target, program, &timezone); + match result.map_err(|e| { + warn!("Handling auth failed: {}", e); + ErrorMessage::new(StatusCode::UNAUTHORIZED, "Auth failed".to_owned()) + })? { + vrl::core::Value::Boolean(true) => { + let enrichment = if let VrlTarget::LogEvent(_, metadata) = &target { + metadata + .value() + .as_object() + .filter(|m| !m.is_empty()) + .cloned() + } else { + None + }; + Ok(enrichment) + } + vrl::core::Value::Boolean(false) => Err(ErrorMessage::new( + StatusCode::UNAUTHORIZED, + "Auth failed".to_owned(), + )), + _ => Err(ErrorMessage::new( + StatusCode::UNAUTHORIZED, + "Invalid return value".to_owned(), + )), + } + } } #[cfg(test)] @@ -294,7 +418,7 @@ mod tests { HttpServerAuthMatcher::AuthHeader(header_value, error_message) => { (header_value, error_message) } - HttpServerAuthMatcher::Vrl { .. } => { + HttpServerAuthMatcher::Vrl { .. } | HttpServerAuthMatcher::VrlEnriching { .. } => { panic!("Expected HttpServerAuthMatcher::AuthHeader") } } diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index 7c444284adc6c..efd666329d247 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -16,7 +16,7 @@ use vector_lib::{ lookup::{lookup_v2::OptionalValuePath, owned_value_path, path}, schema::Definition, }; -use vrl::value::{Kind, kind::Collection}; +use vrl::value::{Kind, ObjectMap, kind::Collection}; use warp::http::HeaderMap; use crate::{ @@ -521,6 +521,26 @@ impl HttpSource for SimpleHttpSource { fn enable_source_ip(&self) -> bool { self.host_key.path.is_some() } + + /// Injects `%field` enrichment collected from a `CustomEnriching` auth VRL program into events. + /// + /// - Legacy namespace: fields are written to the event body (insert-if-empty semantics). + /// - Vector namespace: fields are written to event metadata under `http_server.`. + fn inject_auth_enrichment(&self, events: &mut [Event], enrichment: ObjectMap) { + for event in events.iter_mut() { + if let Event::Log(log) = event { + for (key, value) in &enrichment { + self.log_namespace.insert_source_metadata( + SimpleHttpConfig::NAME, + log, + Some(LegacyKey::InsertIfEmpty(path!(key.as_str()))), + path!(key.as_str()), + value.clone(), + ); + } + } + } + } } #[cfg(test)] diff --git a/src/sources/util/http/prelude.rs b/src/sources/util/http/prelude.rs index 3414ea00264b0..b027a90a404f6 100644 --- a/src/sources/util/http/prelude.rs +++ b/src/sources/util/http/prelude.rs @@ -11,6 +11,7 @@ use vector_lib::{ config::SourceAcknowledgementsConfig, event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event}, }; +use vrl::value::ObjectMap; use warp::{ Filter, filters::{ @@ -56,6 +57,10 @@ pub trait HttpSource: Clone + Send + Sync + 'static { path: &str, ) -> Result, ErrorMessage>; + /// Called after `enrich_events` when auth returned enrichment fields. + /// The default no-op is used by sources that don't support `CustomEnriching` auth. + fn inject_auth_enrichment(&self, _events: &mut [Event], _enrichment: ObjectMap) {} + fn decode(&self, encoding_header: Option<&str>, body: Bytes) -> Result { decompress_body(encoding_header, body) } @@ -132,23 +137,27 @@ pub trait HttpSource: Clone + Send + Sync + 'static { let http_path = path.as_str(); let events = auth_matcher .as_ref() - .map_or(Ok(()), |a| { + .map_or(Ok(None), |a| { a.handle_auth( addr.as_ref().map(|a| a.0).as_ref(), &headers, path.as_str(), ) }) - .and_then(|()| self.decode(encoding_header.as_deref(), body)) - .and_then(|body| { + .and_then(|auth_enrichment| { + self.decode(encoding_header.as_deref(), body) + .map(|body| (body, auth_enrichment)) + }) + .and_then(|(body, auth_enrichment)| { emit!(HttpBytesReceived { byte_size: body.len(), http_path, protocol, }); self.build_events(body, &headers, &query_parameters, path.as_str()) + .map(|events| (events, auth_enrichment)) }) - .map(|mut events| { + .map(|(mut events, auth_enrichment)| { emit!(HttpEventsReceived { count: events.len(), byte_size: events.estimated_json_encoded_size_of(), @@ -166,6 +175,10 @@ pub trait HttpSource: Clone + Send + Sync + 'static { .as_ref(), ); + if let Some(enrichment) = auth_enrichment { + self.inject_auth_enrichment(&mut events, enrichment); + } + events });