Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 130 additions & 6 deletions src/common/http/server_auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use vector_config::configurable_component;
use vector_lib::{
TimeZone, compile_vrl,
event::{Event, LogEvent, VrlTarget},
lookup::OwnedTargetPath,
sensitive_string::SensitiveString,
};
use vector_vrl_metrics::MetricsStorage;
Expand Down Expand Up @@ -59,6 +60,16 @@ pub enum HttpServerAuthConfig {
/// The VRL boolean expression.
source: String,
},

/// Custom authentication using VRL code, with event enrichment.
///
/// Like `custom`, validates the request using a VRL boolean expression.
/// Additionally, metadata fields written via `%field = value` in the VRL
/// program are extracted and injected into every authenticated event.
CustomEnriching {
/// The VRL boolean expression. May write `%field = value` to enrich events.
source: String,
},
}

// Custom deserializer implementation to default `strategy` to `basic`
Expand Down Expand Up @@ -120,7 +131,16 @@ impl<'de> Deserialize<'de> for HttpServerAuthConfig {
.ok_or_else(|| Error::missing_field("source"))?;
Ok(HttpServerAuthConfig::Custom { source })
}
_ => Err(Error::unknown_variant(strategy, &["basic", "custom"])),
"custom_enriching" => {
let source = fields
.remove("source")
.ok_or_else(|| Error::missing_field("source"))?;
Ok(HttpServerAuthConfig::CustomEnriching { source })
}
_ => Err(Error::unknown_variant(
strategy,
&["basic", "custom", "custom_enriching"],
)),
}
}
}
Expand Down Expand Up @@ -171,6 +191,34 @@ impl HttpServerAuthConfig {

Ok(HttpServerAuthMatcher::Vrl { program })
}
HttpServerAuthConfig::CustomEnriching { source } => {
let state = TypeState::default();

let mut config = CompileConfig::default();
config.set_custom(enrichment_tables.clone());
config.set_custom(metrics_storage.clone());
// Lock the event body (.field) as read-only, but leave metadata (%field) writable
// so the VRL program can enrich authenticated events via %field = value.
config.set_read_only_path(OwnedTargetPath::event_root(), true);

let CompilationResult {
program,
warnings,
config: _,
} = compile_vrl(source, &vector_vrl_functions::all(), &state, config)
.map_err(|diagnostics| format_vrl_diagnostics(source, diagnostics))?;

if !program.final_type_info().result.is_boolean() {
return Err("VRL conditions must return a boolean.".into());
}

if !warnings.is_empty() {
let warnings = format_vrl_diagnostics(source, warnings);
warn!(message = "VRL compilation warning.", %warnings);
}

Ok(HttpServerAuthMatcher::VrlEnriching { program })
}
}
}
}
Expand All @@ -187,21 +235,28 @@ pub enum HttpServerAuthMatcher {
/// Compiled VRL script
program: Program,
},
/// Like `Vrl`, but metadata (`%field`) writes in the program are extracted
/// and returned to the caller for injection into authenticated events.
VrlEnriching {
/// Compiled VRL script
program: Program,
},
}

impl HttpServerAuthMatcher {
/// Compares passed headers to the matcher
/// Validates the request. Returns `Ok(Some(enrichment))` for `VrlEnriching` when auth passes
/// and the VRL program wrote `%field` values; returns `Ok(None)` for all other matchers.
pub fn handle_auth(
&self,
address: Option<&SocketAddr>,
headers: &HeaderMap<HeaderValue>,
path: &str,
) -> Result<(), ErrorMessage> {
) -> Result<Option<ObjectMap>, ErrorMessage> {
match self {
HttpServerAuthMatcher::AuthHeader(expected, err_message) => {
if let Some(header) = headers.get(AUTHORIZATION) {
if expected == header {
Ok(())
Ok(None)
} else {
Err(ErrorMessage::new(
StatusCode::UNAUTHORIZED,
Expand All @@ -216,7 +271,11 @@ impl HttpServerAuthMatcher {
}
}
HttpServerAuthMatcher::Vrl { program } => {
self.handle_vrl_auth(address, headers, path, program)
self.handle_vrl_auth(address, headers, path, program)?;
Ok(None)
}
HttpServerAuthMatcher::VrlEnriching { program } => {
self.handle_vrl_enriching_auth(address, headers, path, program)
}
}
}
Expand Down Expand Up @@ -279,6 +338,71 @@ impl HttpServerAuthMatcher {
)),
}
}

fn handle_vrl_enriching_auth(
&self,
address: Option<&SocketAddr>,
headers: &HeaderMap<HeaderValue>,
path: &str,
program: &Program,
) -> Result<Option<ObjectMap>, ErrorMessage> {
let mut target = VrlTarget::new(
Event::Log(LogEvent::from_map(
ObjectMap::from([
(
"headers".into(),
Value::Object(
headers
.iter()
.map(|(k, v)| {
(
KeyString::from(k.to_string()),
Value::Bytes(Bytes::copy_from_slice(v.as_bytes())),
)
})
.collect::<ObjectMap>(),
),
),
(
"address".into(),
address.map_or(Value::Null, |a| Value::from(a.ip().to_string())),
),
("path".into(), Value::from(path.to_owned())),
]),
Default::default(),
)),
program.info(),
false,
);
let timezone = TimeZone::default();

let result = Runtime::default().resolve(&mut target, program, &timezone);
match result.map_err(|e| {
warn!("Handling auth failed: {}", e);
ErrorMessage::new(StatusCode::UNAUTHORIZED, "Auth failed".to_owned())
})? {
vrl::core::Value::Boolean(true) => {
let enrichment = if let VrlTarget::LogEvent(_, metadata) = &target {
metadata
.value()
.as_object()
.filter(|m| !m.is_empty())
.cloned()
} else {
None
};
Ok(enrichment)
}
vrl::core::Value::Boolean(false) => Err(ErrorMessage::new(
StatusCode::UNAUTHORIZED,
"Auth failed".to_owned(),
)),
_ => Err(ErrorMessage::new(
StatusCode::UNAUTHORIZED,
"Invalid return value".to_owned(),
)),
}
}
}

#[cfg(test)]
Expand All @@ -294,7 +418,7 @@ mod tests {
HttpServerAuthMatcher::AuthHeader(header_value, error_message) => {
(header_value, error_message)
}
HttpServerAuthMatcher::Vrl { .. } => {
HttpServerAuthMatcher::Vrl { .. } | HttpServerAuthMatcher::VrlEnriching { .. } => {
panic!("Expected HttpServerAuthMatcher::AuthHeader")
}
}
Expand Down
22 changes: 21 additions & 1 deletion src/sources/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use vector_lib::{
lookup::{lookup_v2::OptionalValuePath, owned_value_path, path},
schema::Definition,
};
use vrl::value::{Kind, kind::Collection};
use vrl::value::{Kind, ObjectMap, kind::Collection};
use warp::http::HeaderMap;

use crate::{
Expand Down Expand Up @@ -521,6 +521,26 @@ impl HttpSource for SimpleHttpSource {
fn enable_source_ip(&self) -> bool {
self.host_key.path.is_some()
}

/// Injects `%field` enrichment collected from a `CustomEnriching` auth VRL program into events.
///
/// - Legacy namespace: fields are written to the event body (insert-if-empty semantics).
/// - Vector namespace: fields are written to event metadata under `http_server.<field>`.
fn inject_auth_enrichment(&self, events: &mut [Event], enrichment: ObjectMap) {
for event in events.iter_mut() {
if let Event::Log(log) = event {
for (key, value) in &enrichment {
self.log_namespace.insert_source_metadata(
SimpleHttpConfig::NAME,
log,
Some(LegacyKey::InsertIfEmpty(path!(key.as_str()))),
path!(key.as_str()),
value.clone(),
);
}
}
}
}
}

#[cfg(test)]
Expand Down
21 changes: 17 additions & 4 deletions src/sources/util/http/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use vector_lib::{
config::SourceAcknowledgementsConfig,
event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event},
};
use vrl::value::ObjectMap;
use warp::{
Filter,
filters::{
Expand Down Expand Up @@ -56,6 +57,10 @@ pub trait HttpSource: Clone + Send + Sync + 'static {
path: &str,
) -> Result<Vec<Event>, ErrorMessage>;

/// Called after `enrich_events` when auth returned enrichment fields.
/// The default no-op is used by sources that don't support `CustomEnriching` auth.
fn inject_auth_enrichment(&self, _events: &mut [Event], _enrichment: ObjectMap) {}

fn decode(&self, encoding_header: Option<&str>, body: Bytes) -> Result<Bytes, ErrorMessage> {
decompress_body(encoding_header, body)
}
Expand Down Expand Up @@ -132,23 +137,27 @@ pub trait HttpSource: Clone + Send + Sync + 'static {
let http_path = path.as_str();
let events = auth_matcher
.as_ref()
.map_or(Ok(()), |a| {
.map_or(Ok(None), |a| {
a.handle_auth(
addr.as_ref().map(|a| a.0).as_ref(),
&headers,
path.as_str(),
)
})
.and_then(|()| self.decode(encoding_header.as_deref(), body))
.and_then(|body| {
.and_then(|auth_enrichment| {
self.decode(encoding_header.as_deref(), body)
.map(|body| (body, auth_enrichment))
})
.and_then(|(body, auth_enrichment)| {
emit!(HttpBytesReceived {
byte_size: body.len(),
http_path,
protocol,
});
self.build_events(body, &headers, &query_parameters, path.as_str())
.map(|events| (events, auth_enrichment))
})
.map(|mut events| {
.map(|(mut events, auth_enrichment)| {
emit!(HttpEventsReceived {
count: events.len(),
byte_size: events.estimated_json_encoded_size_of(),
Expand All @@ -166,6 +175,10 @@ pub trait HttpSource: Clone + Send + Sync + 'static {
.as_ref(),
);

if let Some(enrichment) = auth_enrichment {
self.inject_auth_enrichment(&mut events, enrichment);
}

events
});

Expand Down
Loading