Skip to content

Commit 2e6decc

Browse files
committed
handle JSON encoded otlp in http endpoints
Signed-off-by: Brian L. Troutwine <brian.troutwine@datadoghq.com>
1 parent befba08 commit 2e6decc

File tree

4 files changed

+179
-48
lines changed

4 files changed

+179
-48
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lading/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ nix = { version = "0.30", default-features = false, features = [
5555
num_cpus = { version = "1.16" }
5656
num-traits = { version = "0.2", default-features = false }
5757
once_cell = { workspace = true }
58-
opentelemetry-proto = { version = "0.30.0", features = ["gen-tonic", "logs", "metrics", "trace"] }
58+
opentelemetry-proto = { version = "0.30.0", features = ["gen-tonic", "logs", "metrics", "trace", "with-serde"] }
5959
prost = { workspace = true }
6060
rand = { workspace = true, default-features = false, features = [
6161
"small_rng",

lading/src/blackhole/otlp/http.rs

Lines changed: 175 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -54,36 +54,65 @@ pub(crate) fn run_server(
5454
})
5555
}
5656

57+
/// Response format for OTLP HTTP
58+
#[derive(Clone, Copy, Debug)]
59+
enum ResponseFormat {
60+
Proto,
61+
Json,
62+
}
63+
5764
/// Handler for OTLP HTTP requests
5865
#[derive(Clone, Debug)]
5966
struct OtlpHttpHandler {
6067
labels: Vec<(String, String)>,
61-
empty_metrics_response: Bytes,
62-
empty_traces_response: Bytes,
63-
empty_logs_response: Bytes,
64-
content_type_header_value: http::HeaderValue,
68+
empty_metrics_response_proto: Bytes,
69+
empty_traces_response_proto: Bytes,
70+
empty_logs_response_proto: Bytes,
71+
empty_metrics_response_json: Bytes,
72+
empty_traces_response_json: Bytes,
73+
empty_logs_response_json: Bytes,
74+
content_type_proto: http::HeaderValue,
75+
content_type_json: http::HeaderValue,
6576
response_delay: Duration,
6677
}
6778

6879
impl OtlpHttpHandler {
6980
fn new(response_delay: Duration, labels: &[(String, String)]) -> Self {
70-
// Pre-compute empty responses
71-
let empty_metrics_response =
72-
Bytes::from(ExportMetricsServiceResponse::default().encode_to_vec());
73-
let empty_traces_response =
74-
Bytes::from(ExportTraceServiceResponse::default().encode_to_vec());
75-
let empty_logs_response = Bytes::from(ExportLogsServiceResponse::default().encode_to_vec());
76-
77-
let content_type_header_value = "application/x-protobuf"
81+
// Pre-compute empty responses for both formats
82+
let empty_metrics = ExportMetricsServiceResponse::default();
83+
let empty_traces = ExportTraceServiceResponse::default();
84+
let empty_logs = ExportLogsServiceResponse::default();
85+
86+
let empty_metrics_response_proto = Bytes::from(empty_metrics.encode_to_vec());
87+
let empty_traces_response_proto = Bytes::from(empty_traces.encode_to_vec());
88+
let empty_logs_response_proto = Bytes::from(empty_logs.encode_to_vec());
89+
let empty_metrics_response_json = Bytes::from(
90+
serde_json::to_vec(&empty_metrics).expect("Failed to serialize empty metrics response"),
91+
);
92+
let empty_traces_response_json = Bytes::from(
93+
serde_json::to_vec(&empty_traces).expect("Failed to serialize empty traces response"),
94+
);
95+
let empty_logs_response_json = Bytes::from(
96+
serde_json::to_vec(&empty_logs).expect("Failed to serialize empty logs response"),
97+
);
98+
99+
let content_type_proto = "application/x-protobuf"
78100
.parse()
79101
.expect("application/x-protobuf is a valid MIME type");
102+
let content_type_json = "application/json"
103+
.parse()
104+
.expect("application/json is a valid MIME type");
80105

81106
Self {
82107
labels: labels.to_vec(),
83-
empty_metrics_response,
84-
empty_traces_response,
85-
empty_logs_response,
86-
content_type_header_value,
108+
empty_metrics_response_proto,
109+
empty_traces_response_proto,
110+
empty_logs_response_proto,
111+
empty_metrics_response_json,
112+
empty_traces_response_json,
113+
empty_logs_response_json,
114+
content_type_proto,
115+
content_type_json,
87116
response_delay,
88117
}
89118
}
@@ -92,6 +121,7 @@ impl OtlpHttpHandler {
92121
async fn build_response(
93122
&self,
94123
response_bytes: Bytes,
124+
content_type: &http::HeaderValue,
95125
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
96126
if self.response_delay.as_micros() > 0 {
97127
tokio::time::sleep(self.response_delay).await;
@@ -101,10 +131,7 @@ impl OtlpHttpHandler {
101131
let headers = response
102132
.headers_mut()
103133
.expect("Response builder should always provide headers_mut");
104-
headers.insert(
105-
hyper::header::CONTENT_TYPE,
106-
self.content_type_header_value.clone(),
107-
);
134+
headers.insert(hyper::header::CONTENT_TYPE, content_type.clone());
108135
Ok(response
109136
.body(crate::full(response_bytes))
110137
.expect("Creating HTTP response should not fail"))
@@ -126,61 +153,127 @@ impl OtlpHttpHandler {
126153

127154
counter!("requests_received", &self.labels).increment(1);
128155

156+
let request_format = req
157+
.headers()
158+
.get(hyper::header::CONTENT_TYPE)
159+
.and_then(|ct| ct.to_str().ok())
160+
.map(|ct| {
161+
if ct.contains("application/json") {
162+
ResponseFormat::Json
163+
} else {
164+
ResponseFormat::Proto
165+
}
166+
})
167+
.unwrap_or(ResponseFormat::Proto);
168+
let response_format = req
169+
.headers()
170+
.get(hyper::header::ACCEPT)
171+
.and_then(|accept| accept.to_str().ok())
172+
.map(|accept| {
173+
if accept.contains("application/json") {
174+
ResponseFormat::Json
175+
} else {
176+
ResponseFormat::Proto
177+
}
178+
})
179+
.unwrap_or(request_format);
180+
129181
// Check for empty bodies using Content-Length when available
130182
if let Some(content_length) = req.headers().get(hyper::header::CONTENT_LENGTH) {
131183
if let Ok(length) = content_length.to_str() {
132184
if let Ok(length) = length.parse::<u64>() {
133185
if length == 0 {
134186
counter!("bytes_received", &self.labels).increment(0);
135187

136-
let response_bytes = match path_ref {
137-
"/v1/metrics" => self.empty_metrics_response.clone(),
138-
"/v1/traces" => self.empty_traces_response.clone(),
139-
"/v1/logs" => self.empty_logs_response.clone(),
140-
_ => unreachable!(), // checked earlier
188+
let (response_bytes, content_type) = match (path_ref, response_format) {
189+
("/v1/metrics", ResponseFormat::Json) => (
190+
self.empty_metrics_response_json.clone(),
191+
&self.content_type_json,
192+
),
193+
("/v1/metrics", ResponseFormat::Proto) => (
194+
self.empty_metrics_response_proto.clone(),
195+
&self.content_type_proto,
196+
),
197+
("/v1/traces", ResponseFormat::Json) => (
198+
self.empty_traces_response_json.clone(),
199+
&self.content_type_json,
200+
),
201+
("/v1/traces", ResponseFormat::Proto) => (
202+
self.empty_traces_response_proto.clone(),
203+
&self.content_type_proto,
204+
),
205+
("/v1/logs", ResponseFormat::Json) => (
206+
self.empty_logs_response_json.clone(),
207+
&self.content_type_json,
208+
),
209+
("/v1/logs", ResponseFormat::Proto) => (
210+
self.empty_logs_response_proto.clone(),
211+
&self.content_type_proto,
212+
),
213+
_ => unreachable!(), // path already validated
141214
};
142215

143-
return self.build_response(response_bytes).await;
216+
return self.build_response(response_bytes, content_type).await;
144217
}
145218
}
146219
}
147220
}
148221

222+
// Non-empty body, implies a little more CPU work
149223
let content_encoding = req.headers().get(hyper::header::CONTENT_ENCODING).cloned();
150224
let path = path_ref.to_string();
151225
let (_, body) = req.into_parts();
152226

153227
let body_bytes = body.collect().await?.to_bytes();
154228

155229
counter!("bytes_received", &self.labels).increment(body_bytes.len() as u64);
230+
156231
let response_bytes =
157232
match crate::codec::decode(content_encoding.as_ref(), body_bytes.clone()) {
158233
Ok(decoded) => {
159234
counter!("decoded_bytes_received", &self.labels)
160235
.increment(decoded.len() as u64);
161236

237+
let is_json = matches!(request_format, ResponseFormat::Json);
162238
match path.as_str() {
163-
"/v1/metrics" => self.process_metrics(&decoded),
164-
"/v1/traces" => self.process_traces(&decoded),
165-
"/v1/logs" => self.process_logs(&decoded),
166-
_ => unreachable!(
167-
"path previously checked, catastrophic programming mistake"
168-
),
239+
"/v1/metrics" => self.process_metrics(&decoded, is_json, &response_format),
240+
"/v1/traces" => self.process_traces(&decoded, is_json, &response_format),
241+
"/v1/logs" => self.process_logs(&decoded, is_json, &response_format),
242+
_ => unreachable!("path already validated"),
169243
}
170244
}
171245
Err(response) => return Ok(response),
172246
};
173247

174-
self.build_response(response_bytes).await
248+
let content_type = match response_format {
249+
ResponseFormat::Json => &self.content_type_json,
250+
ResponseFormat::Proto => &self.content_type_proto,
251+
};
252+
253+
self.build_response(response_bytes, content_type).await
175254
}
176255

177-
fn process_metrics(&self, body_bytes: &[u8]) -> Bytes {
256+
fn process_metrics(
257+
&self,
258+
body_bytes: &[u8],
259+
is_json: bool,
260+
response_format: &ResponseFormat,
261+
) -> Bytes {
178262
if body_bytes.is_empty() {
179-
return self.empty_metrics_response.clone();
263+
return match response_format {
264+
ResponseFormat::Json => self.empty_metrics_response_json.clone(),
265+
ResponseFormat::Proto => self.empty_metrics_response_proto.clone(),
266+
};
180267
}
181268

182269
let mut total_points: u64 = 0;
183-
if let Ok(request) = ExportMetricsServiceRequest::decode(body_bytes) {
270+
let request_opt = if is_json {
271+
serde_json::from_slice::<ExportMetricsServiceRequest>(body_bytes).ok()
272+
} else {
273+
ExportMetricsServiceRequest::decode(body_bytes).ok()
274+
};
275+
276+
if let Some(request) = request_opt {
184277
for rm in &request.resource_metrics {
185278
for sm in &rm.scope_metrics {
186279
for m in &sm.metrics {
@@ -203,16 +296,33 @@ impl OtlpHttpHandler {
203296
counter!("data_points_received", &self.labels).increment(total_points);
204297
}
205298

206-
self.empty_metrics_response.clone()
299+
match response_format {
300+
ResponseFormat::Json => self.empty_metrics_response_json.clone(),
301+
ResponseFormat::Proto => self.empty_metrics_response_proto.clone(),
302+
}
207303
}
208304

209-
fn process_traces(&self, body_bytes: &[u8]) -> Bytes {
305+
fn process_traces(
306+
&self,
307+
body_bytes: &[u8],
308+
is_json: bool,
309+
response_format: &ResponseFormat,
310+
) -> Bytes {
210311
if body_bytes.is_empty() {
211-
return self.empty_traces_response.clone();
312+
return match response_format {
313+
ResponseFormat::Json => self.empty_traces_response_json.clone(),
314+
ResponseFormat::Proto => self.empty_traces_response_proto.clone(),
315+
};
212316
}
213317

214318
let mut total_spans: u64 = 0;
215-
if let Ok(request) = ExportTraceServiceRequest::decode(body_bytes) {
319+
let request_opt = if is_json {
320+
serde_json::from_slice::<ExportTraceServiceRequest>(body_bytes).ok()
321+
} else {
322+
ExportTraceServiceRequest::decode(body_bytes).ok()
323+
};
324+
325+
if let Some(request) = request_opt {
216326
for rs in &request.resource_spans {
217327
for ss in &rs.scope_spans {
218328
let spans_count = ss.spans.len() as u64;
@@ -225,16 +335,33 @@ impl OtlpHttpHandler {
225335
counter!("data_points_received", &self.labels).increment(total_spans);
226336
}
227337

228-
self.empty_traces_response.clone()
338+
match response_format {
339+
ResponseFormat::Json => self.empty_traces_response_json.clone(),
340+
ResponseFormat::Proto => self.empty_traces_response_proto.clone(),
341+
}
229342
}
230343

231-
fn process_logs(&self, body_bytes: &[u8]) -> Bytes {
344+
fn process_logs(
345+
&self,
346+
body_bytes: &[u8],
347+
is_json: bool,
348+
response_format: &ResponseFormat,
349+
) -> Bytes {
232350
if body_bytes.is_empty() {
233-
return self.empty_logs_response.clone();
351+
return match response_format {
352+
ResponseFormat::Json => self.empty_logs_response_json.clone(),
353+
ResponseFormat::Proto => self.empty_logs_response_proto.clone(),
354+
};
234355
}
235356

236357
let mut total_logs: u64 = 0;
237-
if let Ok(request) = ExportLogsServiceRequest::decode(body_bytes) {
358+
let request_opt = if is_json {
359+
serde_json::from_slice::<ExportLogsServiceRequest>(body_bytes).ok()
360+
} else {
361+
ExportLogsServiceRequest::decode(body_bytes).ok()
362+
};
363+
364+
if let Some(request) = request_opt {
238365
for rl in &request.resource_logs {
239366
for sl in &rl.scope_logs {
240367
let logs_count = sl.log_records.len() as u64;
@@ -247,6 +374,9 @@ impl OtlpHttpHandler {
247374
counter!("data_points_received", &self.labels).increment(total_logs);
248375
}
249376

250-
self.empty_logs_response.clone()
377+
match response_format {
378+
ResponseFormat::Json => self.empty_logs_response_json.clone(),
379+
ResponseFormat::Proto => self.empty_logs_response_proto.clone(),
380+
}
251381
}
252382
}

lading_payload/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ opentelemetry-proto = { version = "0.30.0", features = [
2020
"metrics",
2121
"logs",
2222
"gen-tonic",
23+
"with-serde",
2324
] }
2425
prost = { workspace = true }
2526
rand = { workspace = true, default-features = false, features = [

0 commit comments

Comments
 (0)