@@ -4,13 +4,13 @@ use crate::error::Result;
44use crate :: execution:: BeginExecution ;
55use crate :: execution:: ExecutionHandle ;
66use crate :: observation_handle:: ObservationHandle ;
7- use crate :: ObservationWithPayload ;
87use async_channel;
98use log:: error;
109use log:: info;
1110use log:: trace;
1211use napi_derive:: napi;
1312use observation_tools_shared:: models:: Execution ;
13+ use observation_tools_shared:: Observation ;
1414// Re-export constants from shared crate for convenience
1515pub use observation_tools_shared:: BATCH_SIZE ;
1616pub use observation_tools_shared:: BLOB_THRESHOLD_BYTES ;
@@ -31,11 +31,18 @@ pub(crate) enum UploaderMessage {
3131 handle : ExecutionHandle ,
3232 uploaded_tx : tokio:: sync:: watch:: Sender < ExecutionUploadResult > ,
3333 } ,
34- Observations {
35- observations : Vec < ObservationWithPayload > ,
34+ Observation {
35+ observation : Observation ,
3636 handle : ObservationHandle ,
3737 uploaded_tx : tokio:: sync:: watch:: Sender < ObservationUploadResult > ,
3838 } ,
39+ Payload {
40+ observation_id : observation_tools_shared:: ObservationId ,
41+ execution_id : observation_tools_shared:: models:: ExecutionId ,
42+ payload_id : observation_tools_shared:: PayloadId ,
43+ name : String ,
44+ payload : observation_tools_shared:: Payload ,
45+ } ,
3946 Flush ,
4047 Shutdown ,
4148}
@@ -48,15 +55,26 @@ impl std::fmt::Debug for UploaderMessage {
4855 . debug_struct ( "Execution" )
4956 . field ( "execution" , execution)
5057 . finish ( ) ,
51- Self :: Observations {
52- observations ,
58+ Self :: Observation {
59+ observation ,
5360 handle,
5461 ..
5562 } => f
56- . debug_struct ( "Observations " )
57- . field ( "observations " , observations )
63+ . debug_struct ( "Observation " )
64+ . field ( "observation " , observation )
5865 . field ( "handle" , handle)
5966 . finish ( ) ,
67+ Self :: Payload {
68+ observation_id,
69+ payload_id,
70+ name,
71+ ..
72+ } => f
73+ . debug_struct ( "Payload" )
74+ . field ( "observation_id" , observation_id)
75+ . field ( "payload_id" , payload_id)
76+ . field ( "name" , name)
77+ . finish ( ) ,
6078 Self :: Flush => write ! ( f, "Flush" ) ,
6179 Self :: Shutdown => write ! ( f, "Shutdown" ) ,
6280 }
@@ -279,6 +297,18 @@ impl ClientBuilder {
279297 }
280298}
281299
300+ /// Data for a payload ready to be uploaded
301+ #[ derive( Debug ) ]
302+ pub ( crate ) struct PayloadUploadData {
303+ pub ( crate ) observation_id : observation_tools_shared:: ObservationId ,
304+ pub ( crate ) execution_id : observation_tools_shared:: models:: ExecutionId ,
305+ pub ( crate ) payload_id : observation_tools_shared:: PayloadId ,
306+ pub ( crate ) name : String ,
307+ pub ( crate ) mime_type : String ,
308+ pub ( crate ) size : usize ,
309+ pub ( crate ) data : Vec < u8 > ,
310+ }
311+
282312async fn uploader_task (
283313 api_client : crate :: server_client:: Client ,
284314 rx : async_channel:: Receiver < UploaderMessage > ,
@@ -291,12 +321,14 @@ async fn uploader_task(
291321 tokio:: sync:: watch:: Sender < ObservationUploadResult > ,
292322 ) ;
293323
294- let flush_observations = async |buffer : & mut Vec < ObservationWithPayload > ,
295- senders : & mut Vec < ObservationSender > | {
296- if buffer. is_empty ( ) {
324+ let flush = async |observation_buffer : & mut Vec < Observation > ,
325+ senders : & mut Vec < ObservationSender > ,
326+ payload_buffer : & mut Vec < PayloadUploadData > | {
327+ if observation_buffer. is_empty ( ) && payload_buffer. is_empty ( ) {
297328 return ;
298329 }
299- let result = upload_observations ( & api_client, buffer. drain ( ..) . collect ( ) ) . await ;
330+ let result =
331+ upload_observations ( & api_client, observation_buffer. drain ( ..) . collect ( ) , payload_buffer. drain ( ..) . collect ( ) ) . await ;
300332 match result {
301333 Ok ( ( ) ) => {
302334 // Signal all senders that observations were uploaded successfully
@@ -314,8 +346,9 @@ async fn uploader_task(
314346 }
315347 }
316348 } ;
317- let mut observation_buffer: Vec < ObservationWithPayload > = Vec :: new ( ) ;
349+ let mut observation_buffer: Vec < Observation > = Vec :: new ( ) ;
318350 let mut sender_buffer: Vec < ObservationSender > = Vec :: new ( ) ;
351+ let mut payload_buffer: Vec < PayloadUploadData > = Vec :: new ( ) ;
319352 loop {
320353 let msg = rx. recv ( ) . await . ok ( ) ;
321354 match msg {
@@ -327,7 +360,6 @@ async fn uploader_task(
327360 let result = upload_execution ( & api_client, execution) . await ;
328361 match result {
329362 Ok ( ( ) ) => {
330- // Signal successful upload with handle
331363 let _ = uploaded_tx. send ( Some ( Ok ( handle) ) ) ;
332364 }
333365 Err ( e) => {
@@ -337,22 +369,36 @@ async fn uploader_task(
337369 }
338370 }
339371 }
340- Some ( UploaderMessage :: Observations {
341- observations ,
372+ Some ( UploaderMessage :: Observation {
373+ observation ,
342374 handle,
343375 uploaded_tx,
344376 } ) => {
345- observation_buffer. extend ( observations ) ;
377+ observation_buffer. push ( observation ) ;
346378 sender_buffer. push ( ( handle, uploaded_tx) ) ;
347- if observation_buffer. len ( ) >= BATCH_SIZE {
348- flush_observations ( & mut observation_buffer, & mut sender_buffer) . await ;
349- }
379+ }
380+ Some ( UploaderMessage :: Payload {
381+ observation_id,
382+ execution_id,
383+ payload_id,
384+ name,
385+ payload,
386+ } ) => {
387+ payload_buffer. push ( PayloadUploadData {
388+ observation_id,
389+ execution_id,
390+ payload_id,
391+ name,
392+ mime_type : payload. mime_type ,
393+ size : payload. size ,
394+ data : payload. data ,
395+ } ) ;
350396 }
351397 Some ( UploaderMessage :: Flush ) => {
352- flush_observations ( & mut observation_buffer, & mut sender_buffer) . await ;
398+ flush ( & mut observation_buffer, & mut sender_buffer, & mut payload_buffer ) . await ;
353399 }
354400 Some ( UploaderMessage :: Shutdown ) | None => {
355- flush_observations ( & mut observation_buffer, & mut sender_buffer) . await ;
401+ flush ( & mut observation_buffer, & mut sender_buffer, & mut payload_buffer ) . await ;
356402 break ;
357403 }
358404 }
@@ -383,31 +429,42 @@ async fn upload_execution(
383429
384430async fn upload_observations (
385431 client : & crate :: server_client:: Client ,
386- observations : Vec < ObservationWithPayload > ,
432+ observations : Vec < Observation > ,
433+ payloads : Vec < PayloadUploadData > ,
387434) -> Result < ( ) > {
388- if observations. is_empty ( ) {
435+ if observations. is_empty ( ) && payloads . is_empty ( ) {
389436 return Ok ( ( ) ) ;
390437 }
391438
392439 // Group by execution_id
393- let mut by_execution: std:: collections:: HashMap < _ , Vec < _ > > = std:: collections:: HashMap :: new ( ) ;
440+ let mut by_execution: std:: collections:: HashMap < _ , ( Vec < _ > , Vec < _ > ) > =
441+ std:: collections:: HashMap :: new ( ) ;
394442 for obs in observations {
395443 by_execution
396- . entry ( obs. observation . execution_id )
444+ . entry ( obs. execution_id )
397445 . or_default ( )
446+ . 0
398447 . push ( obs) ;
399448 }
449+ for p in payloads {
450+ by_execution
451+ . entry ( p. execution_id )
452+ . or_default ( )
453+ . 1
454+ . push ( p) ;
455+ }
400456
401457 // Upload each batch via multipart form
402- for ( execution_id, observations) in by_execution {
458+ for ( execution_id, ( observations, payloads ) ) in by_execution {
403459 trace ! (
404- "Uploading {} observations for execution {}" ,
460+ "Uploading {} observations + {} payloads for execution {}" ,
405461 observations. len( ) ,
462+ payloads. len( ) ,
406463 execution_id
407464 ) ;
408465
409466 client
410- . create_observations_multipart ( & execution_id. to_string ( ) , observations)
467+ . create_observations_multipart ( & execution_id. to_string ( ) , observations, payloads )
411468 . await
412469 . map_err ( |e| crate :: error:: Error :: Config ( e. to_string ( ) ) ) ?;
413470 }
0 commit comments