Skip to content

Commit b790626

Browse files
committed
Split UploaderMessage into separate Observation and Payload messages
Refactor client transport layer for multi-payload support: - Split UploaderMessage::Observations into ::Observation and ::Payload - Add PayloadUploadData for payload transport - Add send_with_execution_and_name with default name "default" - Update server_client multipart upload for separate payloads - Remove ObservationWithPayload type - Rename labels to metadata in logger
1 parent d8beb7a commit b790626

File tree

5 files changed

+175
-83
lines changed

5 files changed

+175
-83
lines changed

crates/observation-tools-client/src/client.rs

Lines changed: 85 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@ use crate::error::Result;
44
use crate::execution::BeginExecution;
55
use crate::execution::ExecutionHandle;
66
use crate::observation_handle::ObservationHandle;
7-
use crate::ObservationWithPayload;
87
use async_channel;
98
use log::error;
109
use log::info;
1110
use log::trace;
1211
use napi_derive::napi;
1312
use observation_tools_shared::models::Execution;
13+
use observation_tools_shared::Observation;
1414
// Re-export constants from shared crate for convenience
1515
pub use observation_tools_shared::BATCH_SIZE;
1616
pub use observation_tools_shared::BLOB_THRESHOLD_BYTES;
@@ -31,11 +31,18 @@ pub(crate) enum UploaderMessage {
3131
handle: ExecutionHandle,
3232
uploaded_tx: tokio::sync::watch::Sender<ExecutionUploadResult>,
3333
},
34-
Observations {
35-
observations: Vec<ObservationWithPayload>,
34+
Observation {
35+
observation: Observation,
3636
handle: ObservationHandle,
3737
uploaded_tx: tokio::sync::watch::Sender<ObservationUploadResult>,
3838
},
39+
Payload {
40+
observation_id: observation_tools_shared::ObservationId,
41+
execution_id: observation_tools_shared::models::ExecutionId,
42+
payload_id: observation_tools_shared::PayloadId,
43+
name: String,
44+
payload: observation_tools_shared::Payload,
45+
},
3946
Flush,
4047
Shutdown,
4148
}
@@ -48,15 +55,26 @@ impl std::fmt::Debug for UploaderMessage {
4855
.debug_struct("Execution")
4956
.field("execution", execution)
5057
.finish(),
51-
Self::Observations {
52-
observations,
58+
Self::Observation {
59+
observation,
5360
handle,
5461
..
5562
} => f
56-
.debug_struct("Observations")
57-
.field("observations", observations)
63+
.debug_struct("Observation")
64+
.field("observation", observation)
5865
.field("handle", handle)
5966
.finish(),
67+
Self::Payload {
68+
observation_id,
69+
payload_id,
70+
name,
71+
..
72+
} => f
73+
.debug_struct("Payload")
74+
.field("observation_id", observation_id)
75+
.field("payload_id", payload_id)
76+
.field("name", name)
77+
.finish(),
6078
Self::Flush => write!(f, "Flush"),
6179
Self::Shutdown => write!(f, "Shutdown"),
6280
}
@@ -279,6 +297,18 @@ impl ClientBuilder {
279297
}
280298
}
281299

300+
/// Data for a payload ready to be uploaded
301+
#[derive(Debug)]
302+
pub(crate) struct PayloadUploadData {
303+
pub(crate) observation_id: observation_tools_shared::ObservationId,
304+
pub(crate) execution_id: observation_tools_shared::models::ExecutionId,
305+
pub(crate) payload_id: observation_tools_shared::PayloadId,
306+
pub(crate) name: String,
307+
pub(crate) mime_type: String,
308+
pub(crate) size: usize,
309+
pub(crate) data: Vec<u8>,
310+
}
311+
282312
async fn uploader_task(
283313
api_client: crate::server_client::Client,
284314
rx: async_channel::Receiver<UploaderMessage>,
@@ -291,12 +321,14 @@ async fn uploader_task(
291321
tokio::sync::watch::Sender<ObservationUploadResult>,
292322
);
293323

294-
let flush_observations = async |buffer: &mut Vec<ObservationWithPayload>,
295-
senders: &mut Vec<ObservationSender>| {
296-
if buffer.is_empty() {
324+
let flush = async |observation_buffer: &mut Vec<Observation>,
325+
senders: &mut Vec<ObservationSender>,
326+
payload_buffer: &mut Vec<PayloadUploadData>| {
327+
if observation_buffer.is_empty() && payload_buffer.is_empty() {
297328
return;
298329
}
299-
let result = upload_observations(&api_client, buffer.drain(..).collect()).await;
330+
let result =
331+
upload_observations(&api_client, observation_buffer.drain(..).collect(), payload_buffer.drain(..).collect()).await;
300332
match result {
301333
Ok(()) => {
302334
// Signal all senders that observations were uploaded successfully
@@ -314,8 +346,9 @@ async fn uploader_task(
314346
}
315347
}
316348
};
317-
let mut observation_buffer: Vec<ObservationWithPayload> = Vec::new();
349+
let mut observation_buffer: Vec<Observation> = Vec::new();
318350
let mut sender_buffer: Vec<ObservationSender> = Vec::new();
351+
let mut payload_buffer: Vec<PayloadUploadData> = Vec::new();
319352
loop {
320353
let msg = rx.recv().await.ok();
321354
match msg {
@@ -327,7 +360,6 @@ async fn uploader_task(
327360
let result = upload_execution(&api_client, execution).await;
328361
match result {
329362
Ok(()) => {
330-
// Signal successful upload with handle
331363
let _ = uploaded_tx.send(Some(Ok(handle)));
332364
}
333365
Err(e) => {
@@ -337,22 +369,36 @@ async fn uploader_task(
337369
}
338370
}
339371
}
340-
Some(UploaderMessage::Observations {
341-
observations,
372+
Some(UploaderMessage::Observation {
373+
observation,
342374
handle,
343375
uploaded_tx,
344376
}) => {
345-
observation_buffer.extend(observations);
377+
observation_buffer.push(observation);
346378
sender_buffer.push((handle, uploaded_tx));
347-
if observation_buffer.len() >= BATCH_SIZE {
348-
flush_observations(&mut observation_buffer, &mut sender_buffer).await;
349-
}
379+
}
380+
Some(UploaderMessage::Payload {
381+
observation_id,
382+
execution_id,
383+
payload_id,
384+
name,
385+
payload,
386+
}) => {
387+
payload_buffer.push(PayloadUploadData {
388+
observation_id,
389+
execution_id,
390+
payload_id,
391+
name,
392+
mime_type: payload.mime_type,
393+
size: payload.size,
394+
data: payload.data,
395+
});
350396
}
351397
Some(UploaderMessage::Flush) => {
352-
flush_observations(&mut observation_buffer, &mut sender_buffer).await;
398+
flush(&mut observation_buffer, &mut sender_buffer, &mut payload_buffer).await;
353399
}
354400
Some(UploaderMessage::Shutdown) | None => {
355-
flush_observations(&mut observation_buffer, &mut sender_buffer).await;
401+
flush(&mut observation_buffer, &mut sender_buffer, &mut payload_buffer).await;
356402
break;
357403
}
358404
}
@@ -383,31 +429,42 @@ async fn upload_execution(
383429

384430
async fn upload_observations(
385431
client: &crate::server_client::Client,
386-
observations: Vec<ObservationWithPayload>,
432+
observations: Vec<Observation>,
433+
payloads: Vec<PayloadUploadData>,
387434
) -> Result<()> {
388-
if observations.is_empty() {
435+
if observations.is_empty() && payloads.is_empty() {
389436
return Ok(());
390437
}
391438

392439
// Group by execution_id
393-
let mut by_execution: std::collections::HashMap<_, Vec<_>> = std::collections::HashMap::new();
440+
let mut by_execution: std::collections::HashMap<_, (Vec<_>, Vec<_>)> =
441+
std::collections::HashMap::new();
394442
for obs in observations {
395443
by_execution
396-
.entry(obs.observation.execution_id)
444+
.entry(obs.execution_id)
397445
.or_default()
446+
.0
398447
.push(obs);
399448
}
449+
for p in payloads {
450+
by_execution
451+
.entry(p.execution_id)
452+
.or_default()
453+
.1
454+
.push(p);
455+
}
400456

401457
// Upload each batch via multipart form
402-
for (execution_id, observations) in by_execution {
458+
for (execution_id, (observations, payloads)) in by_execution {
403459
trace!(
404-
"Uploading {} observations for execution {}",
460+
"Uploading {} observations + {} payloads for execution {}",
405461
observations.len(),
462+
payloads.len(),
406463
execution_id
407464
);
408465

409466
client
410-
.create_observations_multipart(&execution_id.to_string(), observations)
467+
.create_observations_multipart(&execution_id.to_string(), observations, payloads)
411468
.await
412469
.map_err(|e| crate::error::Error::Config(e.to_string()))?;
413470
}

crates/observation-tools-client/src/lib.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ pub use observation_handle::SendObservation;
3333
// Re-export procedural macro
3434
pub use observation_tools_macros::observe;
3535
// Re-export from shared for convenience
36-
use observation_tools_shared::Observation;
3736
pub use observation_tools_shared::Payload;
37+
pub use observation_tools_shared::PayloadBuilder;
3838

3939
/// Register a global execution shared across all threads
4040
///
@@ -64,9 +64,3 @@ pub fn current_execution() -> Option<ExecutionHandle> {
6464
pub fn clear_global_execution() {
6565
context::clear_global_execution()
6666
}
67-
68-
#[derive(Debug)]
69-
struct ObservationWithPayload {
70-
observation: Observation,
71-
payload: Payload,
72-
}

crates/observation-tools-client/src/logger.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ impl Log for ObservationLogger {
4040
let builder = ObservationBuilder::new("ObservationLogger")
4141
.observation_type(ObservationType::LogEntry)
4242
.log_level(record.level().into())
43-
.label(format!("log/{}", record.target()));
43+
.metadata("target", record.target());
4444
let builder = if let (Some(file), Some(line)) = (record.file(), record.line()) {
4545
builder.source(file, line)
4646
} else {

crates/observation-tools-client/src/observation.rs

Lines changed: 35 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,15 @@ use crate::execution::ExecutionHandle;
77
use crate::observation_handle::ObservationHandle;
88
use crate::observation_handle::SendObservation;
99
use crate::Error;
10-
use crate::ObservationWithPayload;
1110
use napi_derive::napi;
11+
use observation_tools_shared::GroupId;
1212
use observation_tools_shared::LogLevel;
1313
use observation_tools_shared::Markdown;
1414
use observation_tools_shared::Observation;
1515
use observation_tools_shared::ObservationId;
1616
use observation_tools_shared::ObservationType;
1717
use observation_tools_shared::Payload;
18+
use observation_tools_shared::PayloadId;
1819
use observation_tools_shared::SourceInfo;
1920
use serde::Serialize;
2021
use std::any::TypeId;
@@ -32,7 +33,7 @@ use std::fmt::Debug;
3233
#[napi]
3334
pub struct ObservationBuilder {
3435
name: String,
35-
labels: Vec<String>,
36+
group_ids: Vec<GroupId>,
3637
metadata: HashMap<String, String>,
3738
source: Option<SourceInfo>,
3839
parent_span_id: Option<String>,
@@ -49,7 +50,7 @@ impl ObservationBuilder {
4950
pub fn new<T: AsRef<str>>(name: T) -> Self {
5051
Self {
5152
name: name.as_ref().to_string(),
52-
labels: Vec::new(),
53+
group_ids: Vec::new(),
5354
metadata: HashMap::new(),
5455
source: None,
5556
parent_span_id: None,
@@ -78,18 +79,6 @@ impl ObservationBuilder {
7879
self
7980
}
8081

81-
/// Add a label to the observation
82-
pub fn label(mut self, label: impl Into<String>) -> Self {
83-
self.labels.push(label.into());
84-
self
85-
}
86-
87-
/// Add multiple labels to the observation
88-
pub fn labels(mut self, labels: impl IntoIterator<Item = impl Into<String>>) -> Self {
89-
self.labels.extend(labels.into_iter().map(|l| l.into()));
90-
self
91-
}
92-
9382
/// Add metadata to the observation
9483
pub fn metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
9584
self.metadata.insert(key.into(), value.into());
@@ -155,7 +144,7 @@ impl ObservationBuilder {
155144
}
156145

157146
/// Internal method to build and send the observation
158-
fn send_observation(mut self, payload: Payload) -> SendObservation {
147+
pub(crate) fn send_observation(mut self, payload: Payload) -> SendObservation {
159148
let Some(execution) = self.execution.take().or_else(context::get_current_execution) else {
160149
log::error!(
161150
"No execution context available for observation '{}'",
@@ -167,11 +156,22 @@ impl ObservationBuilder {
167156
self.send_with_execution(payload, &execution)
168157
}
169158

170-
/// Internal method to build and send the observation with a resolved execution
159+
/// Internal method to build and send the observation with a resolved execution.
160+
/// Sends the default payload with name "default".
171161
fn send_with_execution(
172162
self,
173163
payload: Payload,
174164
execution: &ExecutionHandle,
165+
) -> SendObservation {
166+
self.send_with_execution_and_name(payload, "default", execution)
167+
}
168+
169+
/// Internal method to build and send the observation with a named payload.
170+
fn send_with_execution_and_name(
171+
self,
172+
payload: Payload,
173+
payload_name: impl Into<String>,
174+
execution: &ExecutionHandle,
175175
) -> SendObservation {
176176
let observation_id = self.custom_id.unwrap_or_else(ObservationId::new);
177177

@@ -196,13 +196,12 @@ impl ObservationBuilder {
196196
name: self.name,
197197
observation_type: self.observation_type,
198198
log_level: self.log_level,
199-
labels: self.labels,
199+
group_ids: self.group_ids,
200+
parent_group_id: None,
200201
metadata: self.metadata,
201202
source: self.source,
202203
parent_span_id,
203204
created_at: chrono::Utc::now(),
204-
mime_type: payload.mime_type.clone(),
205-
payload_size: payload.size,
206205
};
207206

208207
let (uploaded_tx, uploaded_rx) = tokio::sync::watch::channel::<ObservationUploadResult>(None);
@@ -215,13 +214,11 @@ impl ObservationBuilder {
215214
observation_id
216215
);
217216

217+
// Send observation metadata
218218
if let Err(e) = execution
219219
.uploader_tx
220-
.try_send(UploaderMessage::Observations {
221-
observations: vec![ObservationWithPayload {
222-
observation,
223-
payload,
224-
}],
220+
.try_send(UploaderMessage::Observation {
221+
observation,
225222
handle: handle.clone(),
226223
uploaded_tx,
227224
})
@@ -230,6 +227,15 @@ impl ObservationBuilder {
230227
return SendObservation::stub(Error::ChannelClosed);
231228
}
232229

230+
// Send the payload as a separate message
231+
let _ = execution.uploader_tx.try_send(UploaderMessage::Payload {
232+
observation_id,
233+
execution_id: execution.id(),
234+
payload_id: PayloadId::new(),
235+
name: payload_name.into(),
236+
payload,
237+
});
238+
233239
SendObservation::new(handle, uploaded_rx)
234240
}
235241
}
@@ -275,10 +281,10 @@ impl ObservationBuilder {
275281
Ok(self)
276282
}
277283

278-
/// Add a label to the observation
279-
#[napi(js_name = "label")]
280-
pub fn label_napi(&mut self, label: String) -> &Self {
281-
self.labels.push(label);
284+
/// Add a group to the observation by group ID string
285+
#[napi(js_name = "group")]
286+
pub fn group_napi(&mut self, group_id: String) -> &Self {
287+
self.group_ids.push(GroupId::parse(&group_id));
282288
self
283289
}
284290

0 commit comments

Comments
 (0)