Skip to content

Commit a8cb401

Browse files
Dig-Dougclaude
andcommitted
Unify payload handling with PayloadBuilder and payload manifest
Remove mime_type/payload_size from Observation struct since these are per-payload concerns, not observation-level metadata. All payloads now flow through a single unified path instead of separate primary and additional payload code paths. Key changes: - Add PayloadBuilder for creating named payloads in shared crate - Replace ObservationWithPayload with separate Observation + Payload messages in the uploader channel - Send payload_manifest JSON in multipart uploads so the server gets authoritative MIME types (falls back to heuristic for old clients) - Add mime_type/size to GetPayload API response for per-payload metadata - Fix named_payload sending payload twice (as both primary and additional) - Update templates to derive type/size from per-payload data Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent ab936b9 commit a8cb401

File tree

17 files changed

+265
-234
lines changed

17 files changed

+265
-234
lines changed

crates/observation-tools-client/openapi.json

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -116,13 +116,22 @@
116116
"id": {
117117
"$ref": "#/components/schemas/PayloadId"
118118
},
119+
"mime_type": {
120+
"type": "string"
121+
},
119122
"name": {
120123
"type": "string"
124+
},
125+
"size": {
126+
"minimum": 0,
127+
"type": "integer"
121128
}
122129
},
123130
"required": [
124131
"id",
125132
"name",
133+
"mime_type",
134+
"size",
126135
"data"
127136
],
128137
"type": "object"
@@ -222,10 +231,6 @@
222231
},
223232
"type": "object"
224233
},
225-
"mime_type": {
226-
"description": "MIME type of the payload (e.g., \"text/plain\", \"application/json\")",
227-
"type": "string"
228-
},
229234
"name": {
230235
"description": "User-defined name for this observation",
231236
"type": "string"
@@ -244,11 +249,6 @@
244249
"nullable": true,
245250
"type": "string"
246251
},
247-
"payload_size": {
248-
"description": "Size of the payload in bytes",
249-
"minimum": 0,
250-
"type": "integer"
251-
},
252252
"source": {
253253
"$ref": "#/components/schemas/SourceInfo",
254254
"description": "Source location where this observation was created",
@@ -261,9 +261,7 @@
261261
"name",
262262
"observation_type",
263263
"log_level",
264-
"created_at",
265-
"mime_type",
266-
"payload_size"
264+
"created_at"
267265
],
268266
"type": "object"
269267
},

crates/observation-tools-client/src/client.rs

Lines changed: 42 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@ use crate::error::Result;
44
use crate::execution::BeginExecution;
55
use crate::execution::ExecutionHandle;
66
use crate::observation_handle::ObservationHandle;
7-
use crate::ObservationWithPayload;
87
use async_channel;
98
use log::error;
109
use log::info;
1110
use log::trace;
1211
use napi_derive::napi;
1312
use observation_tools_shared::models::Execution;
13+
use observation_tools_shared::Observation;
1414
// Re-export constants from shared crate for convenience
1515
pub use observation_tools_shared::BATCH_SIZE;
1616
pub use observation_tools_shared::BLOB_THRESHOLD_BYTES;
@@ -31,12 +31,12 @@ pub(crate) enum UploaderMessage {
3131
handle: ExecutionHandle,
3232
uploaded_tx: tokio::sync::watch::Sender<ExecutionUploadResult>,
3333
},
34-
Observations {
35-
observations: Vec<ObservationWithPayload>,
34+
Observation {
35+
observation: Observation,
3636
handle: ObservationHandle,
3737
uploaded_tx: tokio::sync::watch::Sender<ObservationUploadResult>,
3838
},
39-
AdditionalPayload {
39+
Payload {
4040
observation_id: observation_tools_shared::ObservationId,
4141
execution_id: observation_tools_shared::models::ExecutionId,
4242
payload_id: observation_tools_shared::PayloadId,
@@ -55,22 +55,22 @@ impl std::fmt::Debug for UploaderMessage {
5555
.debug_struct("Execution")
5656
.field("execution", execution)
5757
.finish(),
58-
Self::Observations {
59-
observations,
58+
Self::Observation {
59+
observation,
6060
handle,
6161
..
6262
} => f
63-
.debug_struct("Observations")
64-
.field("observations", observations)
63+
.debug_struct("Observation")
64+
.field("observation", observation)
6565
.field("handle", handle)
6666
.finish(),
67-
Self::AdditionalPayload {
67+
Self::Payload {
6868
observation_id,
6969
payload_id,
7070
name,
7171
..
7272
} => f
73-
.debug_struct("AdditionalPayload")
73+
.debug_struct("Payload")
7474
.field("observation_id", observation_id)
7575
.field("payload_id", payload_id)
7676
.field("name", name)
@@ -297,14 +297,16 @@ impl ClientBuilder {
297297
}
298298
}
299299

300-
/// Data for an additional named payload to be sent alongside observations
300+
/// Data for a payload ready to be uploaded
301301
#[derive(Debug)]
302-
pub(crate) struct AdditionalPayloadData {
302+
pub(crate) struct PayloadUploadData {
303303
pub(crate) observation_id: observation_tools_shared::ObservationId,
304304
pub(crate) execution_id: observation_tools_shared::models::ExecutionId,
305305
pub(crate) payload_id: observation_tools_shared::PayloadId,
306306
pub(crate) name: String,
307-
pub(crate) payload: observation_tools_shared::Payload,
307+
pub(crate) mime_type: String,
308+
pub(crate) size: usize,
309+
pub(crate) data: Vec<u8>,
308310
}
309311

310312
async fn uploader_task(
@@ -319,14 +321,14 @@ async fn uploader_task(
319321
tokio::sync::watch::Sender<ObservationUploadResult>,
320322
);
321323

322-
let flush = async |buffer: &mut Vec<ObservationWithPayload>,
324+
let flush = async |observation_buffer: &mut Vec<Observation>,
323325
senders: &mut Vec<ObservationSender>,
324-
additional_payloads: &mut Vec<AdditionalPayloadData>| {
325-
if buffer.is_empty() && additional_payloads.is_empty() {
326+
payload_buffer: &mut Vec<PayloadUploadData>| {
327+
if observation_buffer.is_empty() && payload_buffer.is_empty() {
326328
return;
327329
}
328330
let result =
329-
upload_observations(&api_client, buffer.drain(..).collect(), additional_payloads.drain(..).collect()).await;
331+
upload_observations(&api_client, observation_buffer.drain(..).collect(), payload_buffer.drain(..).collect()).await;
330332
match result {
331333
Ok(()) => {
332334
// Signal all senders that observations were uploaded successfully
@@ -344,9 +346,9 @@ async fn uploader_task(
344346
}
345347
}
346348
};
347-
let mut observation_buffer: Vec<ObservationWithPayload> = Vec::new();
349+
let mut observation_buffer: Vec<Observation> = Vec::new();
348350
let mut sender_buffer: Vec<ObservationSender> = Vec::new();
349-
let mut additional_payload_buffer: Vec<AdditionalPayloadData> = Vec::new();
351+
let mut payload_buffer: Vec<PayloadUploadData> = Vec::new();
350352
loop {
351353
let msg = rx.recv().await.ok();
352354
match msg {
@@ -358,7 +360,6 @@ async fn uploader_task(
358360
let result = upload_execution(&api_client, execution).await;
359361
match result {
360362
Ok(()) => {
361-
// Signal successful upload with handle
362363
let _ = uploaded_tx.send(Some(Ok(handle)));
363364
}
364365
Err(e) => {
@@ -368,37 +369,36 @@ async fn uploader_task(
368369
}
369370
}
370371
}
371-
Some(UploaderMessage::Observations {
372-
observations,
372+
Some(UploaderMessage::Observation {
373+
observation,
373374
handle,
374375
uploaded_tx,
375376
}) => {
376-
observation_buffer.extend(observations);
377+
observation_buffer.push(observation);
377378
sender_buffer.push((handle, uploaded_tx));
378-
if observation_buffer.len() >= BATCH_SIZE {
379-
flush(&mut observation_buffer, &mut sender_buffer, &mut additional_payload_buffer).await;
380-
}
381379
}
382-
Some(UploaderMessage::AdditionalPayload {
380+
Some(UploaderMessage::Payload {
383381
observation_id,
384382
execution_id,
385383
payload_id,
386384
name,
387385
payload,
388386
}) => {
389-
additional_payload_buffer.push(AdditionalPayloadData {
387+
payload_buffer.push(PayloadUploadData {
390388
observation_id,
391389
execution_id,
392390
payload_id,
393391
name,
394-
payload,
392+
mime_type: payload.mime_type,
393+
size: payload.size,
394+
data: payload.data,
395395
});
396396
}
397397
Some(UploaderMessage::Flush) => {
398-
flush(&mut observation_buffer, &mut sender_buffer, &mut additional_payload_buffer).await;
398+
flush(&mut observation_buffer, &mut sender_buffer, &mut payload_buffer).await;
399399
}
400400
Some(UploaderMessage::Shutdown) | None => {
401-
flush(&mut observation_buffer, &mut sender_buffer, &mut additional_payload_buffer).await;
401+
flush(&mut observation_buffer, &mut sender_buffer, &mut payload_buffer).await;
402402
break;
403403
}
404404
}
@@ -429,10 +429,10 @@ async fn upload_execution(
429429

430430
async fn upload_observations(
431431
client: &crate::server_client::Client,
432-
observations: Vec<ObservationWithPayload>,
433-
additional_payloads: Vec<AdditionalPayloadData>,
432+
observations: Vec<Observation>,
433+
payloads: Vec<PayloadUploadData>,
434434
) -> Result<()> {
435-
if observations.is_empty() && additional_payloads.is_empty() {
435+
if observations.is_empty() && payloads.is_empty() {
436436
return Ok(());
437437
}
438438

@@ -441,30 +441,30 @@ async fn upload_observations(
441441
std::collections::HashMap::new();
442442
for obs in observations {
443443
by_execution
444-
.entry(obs.observation.execution_id)
444+
.entry(obs.execution_id)
445445
.or_default()
446446
.0
447447
.push(obs);
448448
}
449-
for ap in additional_payloads {
449+
for p in payloads {
450450
by_execution
451-
.entry(ap.execution_id)
451+
.entry(p.execution_id)
452452
.or_default()
453453
.1
454-
.push(ap);
454+
.push(p);
455455
}
456456

457457
// Upload each batch via multipart form
458-
for (execution_id, (observations, additional_payloads)) in by_execution {
458+
for (execution_id, (observations, payloads)) in by_execution {
459459
trace!(
460-
"Uploading {} observations + {} additional payloads for execution {}",
460+
"Uploading {} observations + {} payloads for execution {}",
461461
observations.len(),
462-
additional_payloads.len(),
462+
payloads.len(),
463463
execution_id
464464
);
465465

466466
client
467-
.create_observations_multipart(&execution_id.to_string(), observations, additional_payloads)
467+
.create_observations_multipart(&execution_id.to_string(), observations, payloads)
468468
.await
469469
.map_err(|e| crate::error::Error::Config(e.to_string()))?;
470470
}

0 commit comments

Comments
 (0)