Skip to content

Commit b719863

Browse files
Dig-Dougclaude
andcommitted
Add observation hierarchies with groups and multi-part payloads
Replace flat string labels with first-class Group types that form hierarchical containers for observations. Groups are observations themselves (ObservationType::Group) with parent pointers, enabling tree structures like request/response hierarchies. Add multi-part payload support via ObservationPayloadHandle, allowing observations to accumulate named payloads over time (e.g., request then response). Key changes: - GroupId type, GroupBuilder, GroupHandle, SendGroup in shared/client - group! macro for creating groups - ObservationBuilder: .label()/.labels() replaced with .group(&GroupHandle) - Named payload methods: .named_payload(), .named_debug(), .named_raw_payload() - AdditionalPayload uploader message for streaming named payloads - Tracing layer uses GroupHandle::from_id with deterministic span ID mapping - Axum request observer creates group hierarchies per-request - All tests updated, server templates updated Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent a2dd0b6 commit b719863

File tree

22 files changed

+823
-124
lines changed

22 files changed

+823
-124
lines changed

crates/observation-tools-client/openapi.json

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,11 @@
105105
],
106106
"type": "object"
107107
},
108+
"GroupId": {
109+
"description": "Unique identifier for a group (UUIDv7)",
110+
"example": "018e9a3a2c1b7e3f8d2a4b5c6d7e8f9b",
111+
"type": "string"
112+
},
108113
"ListExecutionsResponse": {
109114
"description": "Response for listing executions",
110115
"properties": {
@@ -170,17 +175,17 @@
170175
"$ref": "#/components/schemas/ExecutionId",
171176
"description": "ID of the execution this observation belongs to"
172177
},
173-
"id": {
174-
"$ref": "#/components/schemas/ObservationId",
175-
"description": "Unique identifier for this observation"
176-
},
177-
"labels": {
178-
"description": "Hierarchical labels for grouping observations\nUses path convention (e.g., \"api/request/headers\")",
178+
"group_ids": {
179+
"description": "IDs of groups this observation belongs to",
179180
"items": {
180-
"type": "string"
181+
"$ref": "#/components/schemas/GroupId"
181182
},
182183
"type": "array"
183184
},
185+
"id": {
186+
"$ref": "#/components/schemas/ObservationId",
187+
"description": "Unique identifier for this observation"
188+
},
184189
"log_level": {
185190
"$ref": "#/components/schemas/LogLevel",
186191
"description": "Log level for this observation"
@@ -207,6 +212,11 @@
207212
"$ref": "#/components/schemas/ObservationType",
208213
"description": "Type of observation"
209214
},
215+
"parent_group_id": {
216+
"$ref": "#/components/schemas/GroupId",
217+
"description": "Parent group ID (used when observation_type == Group)",
218+
"nullable": true
219+
},
210220
"parent_span_id": {
211221
"description": "Parent span ID (for tracing integration)",
212222
"nullable": true,
@@ -245,7 +255,8 @@
245255
"enum": [
246256
"LogEntry",
247257
"Payload",
248-
"Span"
258+
"Span",
259+
"Group"
249260
],
250261
"type": "string"
251262
},

crates/observation-tools-client/src/axum/request_observer.rs

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use crate::context;
22
use crate::execution::ExecutionHandle;
3+
use crate::group::GroupBuilder;
4+
use crate::group::GroupHandle;
35
use crate::observation::ObservationBuilder;
46
use axum::body::Body;
57
use axum::extract::Request;
@@ -37,6 +39,8 @@ struct StreamingObserverState {
3739
log_level: LogLevel,
3840
status_code: u16,
3941
execution: ExecutionHandle,
42+
response_group: GroupHandle,
43+
response_body_group: GroupHandle,
4044
}
4145

4246
impl StreamingObserverState {
@@ -45,13 +49,17 @@ impl StreamingObserverState {
4549
log_level: LogLevel,
4650
status_code: u16,
4751
execution: ExecutionHandle,
52+
response_group: GroupHandle,
53+
response_body_group: GroupHandle,
4854
) -> Self {
4955
Self {
5056
buffer: BytesMut::new(),
5157
content_type,
5258
log_level,
5359
status_code,
5460
execution,
61+
response_group,
62+
response_body_group,
5563
}
5664
}
5765

@@ -74,8 +82,8 @@ impl Drop for StreamingObserverState {
7482
};
7583

7684
ObservationBuilder::new("http/response/body")
77-
.label("http/response")
78-
.label("http/response/body")
85+
.group(&self.response_group)
86+
.group(&self.response_body_group)
7987
.metadata("status", &self.status_code.to_string())
8088
.log_level(self.log_level)
8189
.payload_with_execution(payload, &self.execution);
@@ -99,6 +107,8 @@ impl StreamingObserverBody {
99107
log_level: LogLevel,
100108
status_code: u16,
101109
execution: ExecutionHandle,
110+
response_group: GroupHandle,
111+
response_body_group: GroupHandle,
102112
) -> Self {
103113
Self {
104114
inner,
@@ -107,6 +117,8 @@ impl StreamingObserverBody {
107117
log_level,
108118
status_code,
109119
execution,
120+
response_group,
121+
response_body_group,
110122
))),
111123
}
112124
}
@@ -246,9 +258,25 @@ where
246258
};
247259

248260
let (parts, body) = req.into_parts();
261+
262+
// Create groups for this request
263+
let request_group = GroupBuilder::new("request")
264+
.metadata("method", parts.method.to_string())
265+
.metadata("uri", parts.uri.to_string())
266+
.build_with_execution(&execution)
267+
.into_handle();
268+
let request_headers_group = request_group
269+
.child("headers")
270+
.build_with_execution(&execution)
271+
.into_handle();
272+
let request_body_group = request_group
273+
.child("body")
274+
.build_with_execution(&execution)
275+
.into_handle();
276+
249277
ObservationBuilder::new("http/request/headers")
250-
.label("http/request")
251-
.label("http/request/headers")
278+
.group(&request_group)
279+
.group(&request_headers_group)
252280
.metadata("method", parts.method.to_string())
253281
.metadata("uri", parts.uri.to_string())
254282
.serde(&json!(filter_headers(
@@ -262,8 +290,8 @@ where
262290
.map(|collected| collected.to_bytes())
263291
.unwrap_or_else(|_| Bytes::new());
264292
ObservationBuilder::new("http/request/body")
265-
.label("http/request")
266-
.label("http/request/body")
293+
.group(&request_group)
294+
.group(&request_body_group)
267295
.metadata("method", parts.method.to_string())
268296
.metadata("uri", parts.uri.to_string())
269297
.payload(bytes_to_payload(&request_body_bytes, &parts.headers));
@@ -279,9 +307,22 @@ where
279307
500..=599 => LogLevel::Error,
280308
_ => LogLevel::Info,
281309
};
310+
311+
let response_group = GroupBuilder::new("response")
312+
.build_with_execution(&execution)
313+
.into_handle();
314+
let response_headers_group = response_group
315+
.child("headers")
316+
.build_with_execution(&execution)
317+
.into_handle();
318+
let response_body_group = response_group
319+
.child("body")
320+
.build_with_execution(&execution)
321+
.into_handle();
322+
282323
ObservationBuilder::new("http/response/headers")
283-
.label("http/response")
284-
.label("http/response/headers")
324+
.group(&response_group)
325+
.group(&response_headers_group)
285326
.metadata("status", &parts.status.as_u16().to_string())
286327
.log_level(log_level)
287328
.serde(&json!(filter_headers(
@@ -303,6 +344,8 @@ where
303344
log_level,
304345
parts.status.as_u16(),
305346
execution,
347+
response_group,
348+
response_body_group,
306349
);
307350

308351
Ok(Response::from_parts(parts, Body::new(streaming_body)))

crates/observation-tools-client/src/client.rs

Lines changed: 63 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ pub(crate) enum UploaderMessage {
3636
handle: ObservationHandle,
3737
uploaded_tx: tokio::sync::watch::Sender<ObservationUploadResult>,
3838
},
39+
AdditionalPayload {
40+
observation_id: observation_tools_shared::ObservationId,
41+
execution_id: observation_tools_shared::models::ExecutionId,
42+
name: String,
43+
payload: observation_tools_shared::Payload,
44+
},
3945
Flush,
4046
Shutdown,
4147
}
@@ -57,6 +63,15 @@ impl std::fmt::Debug for UploaderMessage {
5763
.field("observations", observations)
5864
.field("handle", handle)
5965
.finish(),
66+
Self::AdditionalPayload {
67+
observation_id,
68+
name,
69+
..
70+
} => f
71+
.debug_struct("AdditionalPayload")
72+
.field("observation_id", observation_id)
73+
.field("name", name)
74+
.finish(),
6075
Self::Flush => write!(f, "Flush"),
6176
Self::Shutdown => write!(f, "Shutdown"),
6277
}
@@ -279,6 +294,15 @@ impl ClientBuilder {
279294
}
280295
}
281296

297+
/// Data for an additional named payload to be sent alongside observations
298+
#[derive(Debug)]
299+
pub(crate) struct AdditionalPayloadData {
300+
pub(crate) observation_id: observation_tools_shared::ObservationId,
301+
pub(crate) execution_id: observation_tools_shared::models::ExecutionId,
302+
pub(crate) name: String,
303+
pub(crate) payload: observation_tools_shared::Payload,
304+
}
305+
282306
async fn uploader_task(
283307
api_client: crate::server_client::Client,
284308
rx: async_channel::Receiver<UploaderMessage>,
@@ -291,12 +315,14 @@ async fn uploader_task(
291315
tokio::sync::watch::Sender<ObservationUploadResult>,
292316
);
293317

294-
let flush_observations = async |buffer: &mut Vec<ObservationWithPayload>,
295-
senders: &mut Vec<ObservationSender>| {
296-
if buffer.is_empty() {
318+
let flush = async |buffer: &mut Vec<ObservationWithPayload>,
319+
senders: &mut Vec<ObservationSender>,
320+
additional_payloads: &mut Vec<AdditionalPayloadData>| {
321+
if buffer.is_empty() && additional_payloads.is_empty() {
297322
return;
298323
}
299-
let result = upload_observations(&api_client, buffer.drain(..).collect()).await;
324+
let result =
325+
upload_observations(&api_client, buffer.drain(..).collect(), additional_payloads.drain(..).collect()).await;
300326
match result {
301327
Ok(()) => {
302328
// Signal all senders that observations were uploaded successfully
@@ -316,6 +342,7 @@ async fn uploader_task(
316342
};
317343
let mut observation_buffer: Vec<ObservationWithPayload> = Vec::new();
318344
let mut sender_buffer: Vec<ObservationSender> = Vec::new();
345+
let mut additional_payload_buffer: Vec<AdditionalPayloadData> = Vec::new();
319346
loop {
320347
let msg = rx.recv().await.ok();
321348
match msg {
@@ -345,14 +372,27 @@ async fn uploader_task(
345372
observation_buffer.extend(observations);
346373
sender_buffer.push((handle, uploaded_tx));
347374
if observation_buffer.len() >= BATCH_SIZE {
348-
flush_observations(&mut observation_buffer, &mut sender_buffer).await;
375+
flush(&mut observation_buffer, &mut sender_buffer, &mut additional_payload_buffer).await;
349376
}
350377
}
378+
Some(UploaderMessage::AdditionalPayload {
379+
observation_id,
380+
execution_id,
381+
name,
382+
payload,
383+
}) => {
384+
additional_payload_buffer.push(AdditionalPayloadData {
385+
observation_id,
386+
execution_id,
387+
name,
388+
payload,
389+
});
390+
}
351391
Some(UploaderMessage::Flush) => {
352-
flush_observations(&mut observation_buffer, &mut sender_buffer).await;
392+
flush(&mut observation_buffer, &mut sender_buffer, &mut additional_payload_buffer).await;
353393
}
354394
Some(UploaderMessage::Shutdown) | None => {
355-
flush_observations(&mut observation_buffer, &mut sender_buffer).await;
395+
flush(&mut observation_buffer, &mut sender_buffer, &mut additional_payload_buffer).await;
356396
break;
357397
}
358398
}
@@ -384,30 +424,41 @@ async fn upload_execution(
384424
async fn upload_observations(
385425
client: &crate::server_client::Client,
386426
observations: Vec<ObservationWithPayload>,
427+
additional_payloads: Vec<AdditionalPayloadData>,
387428
) -> Result<()> {
388-
if observations.is_empty() {
429+
if observations.is_empty() && additional_payloads.is_empty() {
389430
return Ok(());
390431
}
391432

392433
// Group by execution_id
393-
let mut by_execution: std::collections::HashMap<_, Vec<_>> = std::collections::HashMap::new();
434+
let mut by_execution: std::collections::HashMap<_, (Vec<_>, Vec<_>)> =
435+
std::collections::HashMap::new();
394436
for obs in observations {
395437
by_execution
396438
.entry(obs.observation.execution_id)
397439
.or_default()
440+
.0
398441
.push(obs);
399442
}
443+
for ap in additional_payloads {
444+
by_execution
445+
.entry(ap.execution_id)
446+
.or_default()
447+
.1
448+
.push(ap);
449+
}
400450

401451
// Upload each batch via multipart form
402-
for (execution_id, observations) in by_execution {
452+
for (execution_id, (observations, additional_payloads)) in by_execution {
403453
trace!(
404-
"Uploading {} observations for execution {}",
454+
"Uploading {} observations + {} additional payloads for execution {}",
405455
observations.len(),
456+
additional_payloads.len(),
406457
execution_id
407458
);
408459

409460
client
410-
.create_observations_multipart(&execution_id.to_string(), observations)
461+
.create_observations_multipart(&execution_id.to_string(), observations, additional_payloads)
411462
.await
412463
.map_err(|e| crate::error::Error::Config(e.to_string()))?;
413464
}

crates/observation-tools-client/src/execution.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,15 @@ impl ExecutionHandle {
8484
pub fn base_url(&self) -> &str {
8585
&self.base_url
8686
}
87+
88+
/// Create a placeholder handle (for stub observations when no execution context exists)
89+
pub(crate) fn placeholder() -> Self {
90+
Self {
91+
execution_id: ExecutionId::nil(),
92+
uploader_tx: async_channel::unbounded().0,
93+
base_url: String::new(),
94+
}
95+
}
8796
}
8897

8998
#[napi]

0 commit comments

Comments
 (0)