Skip to content

Commit cf06ed7

Browse files
committed
feat(http_server source): add custom_enriching auth type
1 parent 8d83e17 commit cf06ed7

3 files changed

Lines changed: 168 additions & 11 deletions

File tree

src/common/http/server_auth.rs

Lines changed: 130 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use vector_config::configurable_component;
1212
use vector_lib::{
1313
TimeZone, compile_vrl,
1414
event::{Event, LogEvent, VrlTarget},
15+
lookup::OwnedTargetPath,
1516
sensitive_string::SensitiveString,
1617
};
1718
use vector_vrl_metrics::MetricsStorage;
@@ -59,6 +60,16 @@ pub enum HttpServerAuthConfig {
5960
/// The VRL boolean expression.
6061
source: String,
6162
},
63+
64+
/// Custom authentication using VRL code, with event enrichment.
65+
///
66+
/// Like `custom`, validates the request using a VRL boolean expression.
67+
/// Additionally, metadata fields written via `%field = value` in the VRL
68+
/// program are extracted and injected into every authenticated event.
69+
CustomEnriching {
70+
/// The VRL boolean expression. May write `%field = value` to enrich events.
71+
source: String,
72+
},
6273
}
6374

6475
// Custom deserializer implementation to default `strategy` to `basic`
@@ -120,7 +131,16 @@ impl<'de> Deserialize<'de> for HttpServerAuthConfig {
120131
.ok_or_else(|| Error::missing_field("source"))?;
121132
Ok(HttpServerAuthConfig::Custom { source })
122133
}
123-
_ => Err(Error::unknown_variant(strategy, &["basic", "custom"])),
134+
"custom_enriching" => {
135+
let source = fields
136+
.remove("source")
137+
.ok_or_else(|| Error::missing_field("source"))?;
138+
Ok(HttpServerAuthConfig::CustomEnriching { source })
139+
}
140+
_ => Err(Error::unknown_variant(
141+
strategy,
142+
&["basic", "custom", "custom_enriching"],
143+
)),
124144
}
125145
}
126146
}
@@ -171,6 +191,34 @@ impl HttpServerAuthConfig {
171191

172192
Ok(HttpServerAuthMatcher::Vrl { program })
173193
}
194+
HttpServerAuthConfig::CustomEnriching { source } => {
195+
let state = TypeState::default();
196+
197+
let mut config = CompileConfig::default();
198+
config.set_custom(enrichment_tables.clone());
199+
config.set_custom(metrics_storage.clone());
200+
// Lock the event body (.field) as read-only, but leave metadata (%field) writable
201+
// so the VRL program can enrich authenticated events via %field = value.
202+
config.set_read_only_path(OwnedTargetPath::event_root(), true);
203+
204+
let CompilationResult {
205+
program,
206+
warnings,
207+
config: _,
208+
} = compile_vrl(source, &vector_vrl_functions::all(), &state, config)
209+
.map_err(|diagnostics| format_vrl_diagnostics(source, diagnostics))?;
210+
211+
if !program.final_type_info().result.is_boolean() {
212+
return Err("VRL conditions must return a boolean.".into());
213+
}
214+
215+
if !warnings.is_empty() {
216+
let warnings = format_vrl_diagnostics(source, warnings);
217+
warn!(message = "VRL compilation warning.", %warnings);
218+
}
219+
220+
Ok(HttpServerAuthMatcher::VrlEnriching { program })
221+
}
174222
}
175223
}
176224
}
@@ -187,21 +235,28 @@ pub enum HttpServerAuthMatcher {
187235
/// Compiled VRL script
188236
program: Program,
189237
},
238+
/// Like `Vrl`, but metadata (`%field`) writes in the program are extracted
239+
/// and returned to the caller for injection into authenticated events.
240+
VrlEnriching {
241+
/// Compiled VRL script
242+
program: Program,
243+
},
190244
}
191245

192246
impl HttpServerAuthMatcher {
193-
/// Compares passed headers to the matcher
247+
/// Validates the request. Returns `Ok(Some(enrichment))` for `VrlEnriching` when auth passes
248+
/// and the VRL program wrote `%field` values; returns `Ok(None)` for all other matchers.
194249
pub fn handle_auth(
195250
&self,
196251
address: Option<&SocketAddr>,
197252
headers: &HeaderMap<HeaderValue>,
198253
path: &str,
199-
) -> Result<(), ErrorMessage> {
254+
) -> Result<Option<ObjectMap>, ErrorMessage> {
200255
match self {
201256
HttpServerAuthMatcher::AuthHeader(expected, err_message) => {
202257
if let Some(header) = headers.get(AUTHORIZATION) {
203258
if expected == header {
204-
Ok(())
259+
Ok(None)
205260
} else {
206261
Err(ErrorMessage::new(
207262
StatusCode::UNAUTHORIZED,
@@ -216,7 +271,11 @@ impl HttpServerAuthMatcher {
216271
}
217272
}
218273
HttpServerAuthMatcher::Vrl { program } => {
219-
self.handle_vrl_auth(address, headers, path, program)
274+
self.handle_vrl_auth(address, headers, path, program)?;
275+
Ok(None)
276+
}
277+
HttpServerAuthMatcher::VrlEnriching { program } => {
278+
self.handle_vrl_enriching_auth(address, headers, path, program)
220279
}
221280
}
222281
}
@@ -279,6 +338,71 @@ impl HttpServerAuthMatcher {
279338
)),
280339
}
281340
}
341+
342+
fn handle_vrl_enriching_auth(
343+
&self,
344+
address: Option<&SocketAddr>,
345+
headers: &HeaderMap<HeaderValue>,
346+
path: &str,
347+
program: &Program,
348+
) -> Result<Option<ObjectMap>, ErrorMessage> {
349+
let mut target = VrlTarget::new(
350+
Event::Log(LogEvent::from_map(
351+
ObjectMap::from([
352+
(
353+
"headers".into(),
354+
Value::Object(
355+
headers
356+
.iter()
357+
.map(|(k, v)| {
358+
(
359+
KeyString::from(k.to_string()),
360+
Value::Bytes(Bytes::copy_from_slice(v.as_bytes())),
361+
)
362+
})
363+
.collect::<ObjectMap>(),
364+
),
365+
),
366+
(
367+
"address".into(),
368+
address.map_or(Value::Null, |a| Value::from(a.ip().to_string())),
369+
),
370+
("path".into(), Value::from(path.to_owned())),
371+
]),
372+
Default::default(),
373+
)),
374+
program.info(),
375+
false,
376+
);
377+
let timezone = TimeZone::default();
378+
379+
let result = Runtime::default().resolve(&mut target, program, &timezone);
380+
match result.map_err(|e| {
381+
warn!("Handling auth failed: {}", e);
382+
ErrorMessage::new(StatusCode::UNAUTHORIZED, "Auth failed".to_owned())
383+
})? {
384+
vrl::core::Value::Boolean(true) => {
385+
let enrichment = if let VrlTarget::LogEvent(_, metadata) = &target {
386+
metadata
387+
.value()
388+
.as_object()
389+
.filter(|m| !m.is_empty())
390+
.cloned()
391+
} else {
392+
None
393+
};
394+
Ok(enrichment)
395+
}
396+
vrl::core::Value::Boolean(false) => Err(ErrorMessage::new(
397+
StatusCode::UNAUTHORIZED,
398+
"Auth failed".to_owned(),
399+
)),
400+
_ => Err(ErrorMessage::new(
401+
StatusCode::UNAUTHORIZED,
402+
"Invalid return value".to_owned(),
403+
)),
404+
}
405+
}
282406
}
283407

284408
#[cfg(test)]
@@ -294,7 +418,7 @@ mod tests {
294418
HttpServerAuthMatcher::AuthHeader(header_value, error_message) => {
295419
(header_value, error_message)
296420
}
297-
HttpServerAuthMatcher::Vrl { .. } => {
421+
HttpServerAuthMatcher::Vrl { .. } | HttpServerAuthMatcher::VrlEnriching { .. } => {
298422
panic!("Expected HttpServerAuthMatcher::AuthHeader")
299423
}
300424
}

src/sources/http_server.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use vector_lib::{
1616
lookup::{lookup_v2::OptionalValuePath, owned_value_path, path},
1717
schema::Definition,
1818
};
19-
use vrl::value::{Kind, kind::Collection};
19+
use vrl::value::{Kind, ObjectMap, kind::Collection};
2020
use warp::http::HeaderMap;
2121

2222
use crate::{
@@ -521,6 +521,26 @@ impl HttpSource for SimpleHttpSource {
521521
fn enable_source_ip(&self) -> bool {
522522
self.host_key.path.is_some()
523523
}
524+
525+
/// Injects `%field` enrichment collected from a `CustomEnriching` auth VRL program into events.
526+
///
527+
/// - Legacy namespace: fields are written to the event body (insert-if-empty semantics).
528+
/// - Vector namespace: fields are written to event metadata under `http_server.<field>`.
529+
fn inject_auth_enrichment(&self, events: &mut [Event], enrichment: ObjectMap) {
530+
for event in events.iter_mut() {
531+
if let Event::Log(log) = event {
532+
for (key, value) in &enrichment {
533+
self.log_namespace.insert_source_metadata(
534+
SimpleHttpConfig::NAME,
535+
log,
536+
Some(LegacyKey::InsertIfEmpty(path!(key.as_str()))),
537+
path!(key.as_str()),
538+
value.clone(),
539+
);
540+
}
541+
}
542+
}
543+
}
524544
}
525545

526546
#[cfg(test)]

src/sources/util/http/prelude.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use vector_lib::{
1111
config::SourceAcknowledgementsConfig,
1212
event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event},
1313
};
14+
use vrl::value::ObjectMap;
1415
use warp::{
1516
Filter,
1617
filters::{
@@ -56,6 +57,10 @@ pub trait HttpSource: Clone + Send + Sync + 'static {
5657
path: &str,
5758
) -> Result<Vec<Event>, ErrorMessage>;
5859

60+
/// Called after `enrich_events` when auth returned enrichment fields.
61+
/// The default no-op is used by sources that don't support `CustomEnriching` auth.
62+
fn inject_auth_enrichment(&self, _events: &mut [Event], _enrichment: ObjectMap) {}
63+
5964
fn decode(&self, encoding_header: Option<&str>, body: Bytes) -> Result<Bytes, ErrorMessage> {
6065
decompress_body(encoding_header, body)
6166
}
@@ -132,23 +137,27 @@ pub trait HttpSource: Clone + Send + Sync + 'static {
132137
let http_path = path.as_str();
133138
let events = auth_matcher
134139
.as_ref()
135-
.map_or(Ok(()), |a| {
140+
.map_or(Ok(None), |a| {
136141
a.handle_auth(
137142
addr.as_ref().map(|a| a.0).as_ref(),
138143
&headers,
139144
path.as_str(),
140145
)
141146
})
142-
.and_then(|()| self.decode(encoding_header.as_deref(), body))
143-
.and_then(|body| {
147+
.and_then(|auth_enrichment| {
148+
self.decode(encoding_header.as_deref(), body)
149+
.map(|body| (body, auth_enrichment))
150+
})
151+
.and_then(|(body, auth_enrichment)| {
144152
emit!(HttpBytesReceived {
145153
byte_size: body.len(),
146154
http_path,
147155
protocol,
148156
});
149157
self.build_events(body, &headers, &query_parameters, path.as_str())
158+
.map(|events| (events, auth_enrichment))
150159
})
151-
.map(|mut events| {
160+
.map(|(mut events, auth_enrichment)| {
152161
emit!(HttpEventsReceived {
153162
count: events.len(),
154163
byte_size: events.estimated_json_encoded_size_of(),
@@ -166,6 +175,10 @@ pub trait HttpSource: Clone + Send + Sync + 'static {
166175
.as_ref(),
167176
);
168177

178+
if let Some(enrichment) = auth_enrichment {
179+
self.inject_auth_enrichment(&mut events, enrichment);
180+
}
181+
169182
events
170183
});
171184

0 commit comments

Comments
 (0)