Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 132 additions & 2 deletions lading/src/blackhole/datadog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use hyper_util::{
rt::{TokioExecutor, TokioIo},
server::conn::auto,
};
use lading_capture::{counter_incr, gauge_set};
use lading_capture::{counter_incr, gauge_set, histogram};
use metrics::counter;
use prost::Message;
use serde::{Deserialize, Serialize};
Expand All @@ -45,7 +45,7 @@ use tokio::net::TcpListener;
use tracing::{debug, error, info, trace, warn};

use super::General;
use crate::proto::datadog::intake::metrics::MetricPayload;
use crate::proto::datadog::intake::metrics::{MetricPayload, SketchPayload};

#[derive(thiserror::Error, Debug)]
/// Errors produced by [`Datadog`].
Expand Down Expand Up @@ -243,6 +243,9 @@ async fn handle_request(
("/api/v2/series", "application/x-protobuf") => {
handle_v2_protobuf(&whole_body, content_encoding, &path, labels).await
}
("/api/v2/sketches", "application/x-protobuf") => {
handle_v2_sketches_protobuf(&whole_body, content_encoding, &path, labels).await
}
_ => StatusCode::ACCEPTED,
};

Expand Down Expand Up @@ -396,6 +399,108 @@ async fn handle_v2_protobuf(
StatusCode::ACCEPTED
}

async fn handle_v2_sketches_protobuf(
body: &[u8],
content_encoding: &str,
path: &str,
labels: &[(String, String)],
) -> StatusCode {
let decompressed = match decompress_if_needed(body, content_encoding) {
Ok(data) => data,
Err(e) => {
warn!(
"Failed to decompress body for path={path}, encoding={content_encoding}, body_size={}: {e}",
body.len()
);
return StatusCode::BAD_REQUEST;
}
};

match SketchPayload::decode(&decompressed[..]) {
Ok(payload) => {
trace!(
"Parsed protobuf SketchPayload: {} sketches",
payload.sketches.len()
);

for sketch in &payload.sketches {
// Warn if legacy Distribution format is present; modern agents use Dogsketch.
if !sketch.distributions.is_empty() {
warn!(
"Sketch {} has {} legacy Distribution entries (not supported, ignoring)",
sketch.metric,
sketch.distributions.len()
);
}

// Parse Datadog tags (format: "key:value" or "key") into label pairs.
// Key-only tags are represented with an empty value.
let tag_pairs: Vec<(&str, &str)> = sketch
.tags
.iter()
.map(|tag| tag.split_once(':').unwrap_or((tag.as_str(), "")))
.collect();

for dogsketch in &sketch.dogsketches {
if dogsketch.cnt == 0 {
continue;
}
let timestamp = unix_to_instant(dogsketch.ts);

// Record min and max as explicit samples to preserve the range.
for &boundary in &[dogsketch.min, dogsketch.max] {
if let Err(e) =
histogram(&sketch.metric, &tag_pairs, boundary, timestamp).await
{
warn!(
"Failed to record sketch boundary for {}: {e}",
sketch.metric
);
}
}

// Expand DDSketch buckets into approximate samples.
// Each k/n pair represents n samples whose value falls in the bucket
// indexed by k. We emit n calls to histogram() at the bucket's
// representative value (geometric midpoint).
for (ki, ni) in dogsketch.k.iter().zip(dogsketch.n.iter()) {
let value = ddsketch_key_to_value(*ki);
for _ in 0..*ni {
if let Err(e) =
histogram(&sketch.metric, &tag_pairs, value, timestamp).await
{
warn!("Failed to record sketch sample for {}: {e}", sketch.metric);
break;
}
}
}
}
}

counter!("datadog_intake_payloads_parsed", labels).increment(1);
}
Err(e) => {
warn!(
"Failed to parse sketch protobuf for path={path}, decompressed_size={}: {e}",
decompressed.len()
);
counter!("datadog_intake_parse_failures", labels).increment(1);
}
}
StatusCode::ACCEPTED
}

/// Convert a `DDSketch` bucket index to its representative value.
///
/// Uses the default Datadog agent mapping: logarithmic with relative
/// accuracy 0.01 (gamma ≈ 1.0202). The representative value is the
/// geometric midpoint of the bucket's interval `[gamma^(k-1), gamma^k]`.
fn ddsketch_key_to_value(k: i32) -> f64 {
const RELATIVE_ACCURACY: f64 = 0.01;
let gamma = (1.0 + RELATIVE_ACCURACY) / (1.0 - RELATIVE_ACCURACY);
gamma.powi(k) / gamma.sqrt()
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -408,4 +513,29 @@ v2:
"#;
let _config: Config = serde_yaml::from_str(yaml).unwrap();
}

#[test]
fn ddsketch_key_to_value_monotone() {
// Values must be strictly increasing as k increases.
let values: Vec<f64> = (-5..=5).map(ddsketch_key_to_value).collect();
for w in values.windows(2) {
assert!(w[1] > w[0], "expected monotone increase: {w:?}");
}
}

#[test]
fn ddsketch_key_to_value_positive() {
// All bucket values must be positive.
for k in -100..=100_i32 {
let v = ddsketch_key_to_value(k);
assert!(v > 0.0, "k={k} gave non-positive value {v}");
}
}

#[test]
fn ddsketch_key_to_value_k0_near_one() {
// k=0 maps to 1/sqrt(gamma). With gamma≈1.0202 this is just below 1.0.
let v = ddsketch_key_to_value(0);
assert!(v > 0.98 && v < 1.0, "k=0 value out of expected range: {v}");
}
}
Loading