Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/25441_llmobs_endpoint.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `datadog_agent` source now accepts LLM Observability (LLMObs) telemetry at `/api/v2/llmobs`. When `multiple_outputs` is enabled, LLMObs span events are available as log events on the `llmobs` output port.

authors: ronitanilkumar
157 changes: 157 additions & 0 deletions src/sources/datadog_agent/llmobs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
use bytes::Bytes;
use http::StatusCode;
use serde::Deserialize;
use serde_json::Value;
use std::sync::Arc;
use warp::{Filter, filters::BoxedFilter, path, path::FullPath, reply::Response};

use super::{ApiKeyQueryParams, DatadogAgentSource, RequestHandler};
use crate::{
common::http::ErrorMessage,
event::{Event, LogEvent},
internal_events::DatadogAgentJsonParseError,
};

pub(super) fn build_warp_filter(
handler: RequestHandler,
source: DatadogAgentSource,
) -> BoxedFilter<(Response,)> {
warp::post()
.and(path!("api" / "v2" / "llmobs" / ..))
Comment thread
pront marked this conversation as resolved.
Outdated

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Register the EVP proxy route used in agent mode

When an LLMObs SDK is configured to send through its Datadog Agent, it posts spans to /evp_proxy/v2/api/v2/llmobs (for example, dd-trace-py defines this as its proxied endpoint), not directly to /api/v2/llmobs. This filter only matches the direct intake path, so standard agent-mode clients pointed at Vector still receive a 404 and the feature only works after a nonstandard endpoint override.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in a4f25b6. A second warp filter for /evp_proxy/v2/api/v2/llmobs is now registered and combined with the direct route via .or().unify().boxed().

.and(warp::path::full())
.and(warp::header::optional::<String>("content-encoding"))
.and(warp::header::optional::<String>("dd-api-key"))
.and(warp::query::<ApiKeyQueryParams>())
.and(warp::body::bytes())
.and_then(
move |path: FullPath,
encoding_header: Option<String>,
api_token: Option<String>,
query_params: ApiKeyQueryParams,
body: Bytes| {
let events = source
.decode(&encoding_header, body, path.as_str())
.and_then(|body| {
decode_llmobs_body(
body,
source.api_key_extractor.extract(
path.as_str(),
api_token,
query_params.dd_api_key,
),
)
});
handler.clone().handle_request(events, super::LLMOBS)
},
)
.boxed()
}

#[derive(Deserialize)]
struct LLMObsEnvelopeItem {
#[serde(rename = "event_type")]
_event_type: Option<String>,
spans: Vec<LLMObsSpan>,
#[serde(rename = "_dd.tracer_version")]
dd_tracer_version: Option<String>,
#[serde(rename = "_dd.scope")]
_dd_scope: Option<String>,
}

#[derive(Deserialize)]
struct LLMObsSpan {
Comment thread
pront marked this conversation as resolved.
Comment thread
pront marked this conversation as resolved.
span_id: String,
trace_id: String,
parent_id: Option<String>,
name: Option<String>,
session_id: Option<String>,
service: Option<String>,
start_ns: Option<i64>,
duration: Option<i64>,
status: Option<String>,
status_message: Option<String>,
meta: Option<Value>,
metrics: Option<Value>,
#[serde(default)]
tags: Vec<String>,
#[serde(rename = "_dd")]
dd: Option<Value>,
}

pub(crate) fn decode_llmobs_body(
body: Bytes,
api_key: Option<Arc<str>>,
) -> Result<Vec<Event>, ErrorMessage> {
let envelope: Vec<LLMObsEnvelopeItem> = serde_json::from_slice(&body).map_err(|error| {
Comment thread
pront marked this conversation as resolved.
Comment thread
pront marked this conversation as resolved.
emit!(DatadogAgentJsonParseError { error: &error });
ErrorMessage::new(
StatusCode::BAD_REQUEST,
format!("Error parsing JSON: {error:?}"),
)
})?;

let events = envelope
.into_iter()
.flat_map(|item| {
let tracer_version = item.dd_tracer_version.clone();
item.spans.into_iter().map(move |span| {
let mut log = LogEvent::default();
Comment thread
pront marked this conversation as resolved.
Comment thread
pront marked this conversation as resolved.
log.insert("span_id", span.span_id);
log.insert("trace_id", span.trace_id);
if let Some(v) = span.parent_id {
log.insert("parent_id", v);
}
if let Some(v) = span.name {
log.insert("name", v);
}
if let Some(v) = span.session_id {
log.insert("session_id", v);
}
if let Some(v) = span.service {
log.insert("service", v);
}
if let Some(v) = span.start_ns {
log.insert("start_ns", v);
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Use the span start time as the log timestamp

LLMObs spans carry their actual event time in nanoseconds via start_ns, but the emitted log only stores that value as an ordinary integer and never assigns it the log timestamp meaning. Once standard source metadata is added, these events will be timestamped at Vector ingestion time instead, so delayed or buffered spans are written to time-aware log sinks at the wrong time; convert start_ns and use it as the event timestamp.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in a4f25b6. When start_ns is present, Utc.timestamp_nanos(ns) is now inserted at log_schema().timestamp_key_target_path() in addition to storing the raw integer value.

Comment thread
pront marked this conversation as resolved.
Outdated
if let Some(v) = span.duration {
log.insert("duration", v);
}
if let Some(v) = span.status {
log.insert("status", v);
}
if let Some(v) = span.status_message {
log.insert("status_message", v);
}
if let Some(v) = span.meta {
log.insert("meta", v);
}
if let Some(v) = span.metrics {
log.insert("metrics", v);
}
if !span.tags.is_empty() {
log.insert("tags", span.tags);
}
if let Some(ml_app) = span
.dd
.as_ref()
.and_then(|dd| dd.get("ml_app"))
.and_then(|v| v.as_str())
{
log.insert("ml_app", ml_app.to_owned());
Comment thread
pront marked this conversation as resolved.
Outdated
Comment thread
pront marked this conversation as resolved.
Outdated
}
if let Some(v) = tracer_version.clone() {
log.insert("_dd.tracer_version", v);
}
Event::Log(log)
})
})
.map(|mut event| {
if let Some(k) = &api_key {
event.metadata_mut().set_datadog_api_key(Arc::clone(k));
}
event
})
.collect();

Ok(events)
Comment on lines +235 to +241

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Report received LLMObs events

After decoding succeeds, this function returns the events without emitting through source.events_received, while the log, metric, and trace decoders all emit CountByteSize through that registered handle. Deployments receiving LLMObs traffic will therefore under-report component_received_events_total and received-event byte metrics, potentially showing zero received events for an LLMObs-only source despite successfully forwarding data.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in a4f25b6. CountByteSize is now emitted through source.events_received after collecting all events, using EstimatedJsonEncodedSizeOf to accumulate byte size, matching the pattern in other decoders.

Comment thread
pront marked this conversation as resolved.
}
21 changes: 20 additions & 1 deletion src/sources/datadog_agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod tests;
pub mod logs;
pub mod metrics;
pub mod traces;
pub mod llmobs;

#[allow(warnings, clippy::pedantic, clippy::nursery)]
pub(crate) mod ddmetric_proto {
Expand Down Expand Up @@ -69,6 +70,7 @@ use crate::{
pub const LOGS: &str = "logs";
pub const METRICS: &str = "metrics";
pub const TRACES: &str = "traces";
pub const LLMOBS: &str = "llmobs";

/// Configuration for the `datadog_agent` source.
#[configurable_component(source(
Expand Down Expand Up @@ -106,6 +108,11 @@ pub struct DatadogAgentConfig {
#[serde(default = "crate::serde::default_false")]
disable_traces: bool,

/// If this is set to `true`, LLM Observability events are not accepted by the component.
#[configurable(metadata(docs::advanced))]
#[serde(default = "crate::serde::default_false")]
disable_llmobs: bool,

/// If this is set to `true`, logs, metrics (beta), and traces (alpha) are sent to different outputs.
///
///
Expand Down Expand Up @@ -179,6 +186,7 @@ impl GenerateConfig for DatadogAgentConfig {
disable_logs: false,
disable_metrics: false,
disable_traces: false,
disable_llmobs: false,
multiple_outputs: false,
parse_ddtags: false,
split_metric_namespace: true,
Expand Down Expand Up @@ -322,6 +330,7 @@ impl SourceConfig for DatadogAgentConfig {
.with_standard_vector_source_metadata();

let mut output = Vec::with_capacity(1);
let llmobs_definition = definition.clone();

if self.multiple_outputs {
if !self.disable_logs {
Expand All @@ -333,6 +342,9 @@ impl SourceConfig for DatadogAgentConfig {
if !self.disable_traces {
output.push(SourceOutput::new_traces().with_port(TRACES))
}
if !self.disable_llmobs {
output.push(SourceOutput::new_maybe_logs(DataType::Log, llmobs_definition).with_port(LLMOBS))
Comment thread
pront marked this conversation as resolved.
Outdated
Comment thread
pront marked this conversation as resolved.
Outdated
}
} else {
output.push(SourceOutput::new_maybe_logs(
DataType::all_bits(),
Comment on lines 387 to 388

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Include LLMObs in the default output schema

When multiple_outputs is left at the default false, LLMObs requests are accepted and routed through the default output, but this branch still advertises only the ordinary log decoder schema. With schema validation enabled, downstream components that read from agent cannot statically access fields such as span_id/trace_id and may still require log fields like message, even though this same output now carries LLMObs log events; merge the LLMObs schema into the default output or relax it when LLMObs is enabled.

Useful? React with 👍 / 👎.

Expand Down Expand Up @@ -459,12 +471,19 @@ impl DatadogAgentSource {
}

if !config.disable_metrics {
let metrics_filter = metrics::build_warp_filter(handler, self.clone());
let metrics_filter = metrics::build_warp_filter(handler.clone(), self.clone());
filters = filters
.map(|f| f.or(metrics_filter.clone()).unify().boxed())
.or(Some(metrics_filter));
}

if !config.disable_llmobs {
let llmobs_filter = llmobs::build_warp_filter(handler.clone(), self.clone());
filters = filters
.map(|f| f.or(llmobs_filter.clone()).unify().boxed())
.or(Some(llmobs_filter));
}

filters.ok_or_else(|| "At least one of the supported data type shall be enabled".into())
}

Expand Down
Loading
Loading