diff --git a/crates/observation-tools-client/index.d.ts b/crates/observation-tools-client/index.d.ts index 6ab0766..7125b20 100644 --- a/crates/observation-tools-client/index.d.ts +++ b/crates/observation-tools-client/index.d.ts @@ -51,7 +51,12 @@ export declare class ExecutionHandle { ): string; } -/** Builder for creating observations */ +/** + * Builder for creating observations (without payload set yet) + * + * Call `.payload()` or `.custom_payload()` to get an + * `ObservationBuilderWithPayload` that can be built. + */ export declare class ObservationBuilder { /** Create a new observation builder with the given name */ constructor(name: string); @@ -62,16 +67,28 @@ export declare class ObservationBuilder { /** Set the source info for the observation */ source(file: string, line: number): this; /** Set the payload as JSON data */ - jsonPayload(jsonString: string): this; + jsonPayload(jsonString: string): ObservationBuilderWithPayload; /** Set the payload with custom data and MIME type */ - rawPayload(data: string, mimeType: string): this; + rawPayload(data: string, mimeType: string): ObservationBuilderWithPayload; /** Set the payload as markdown content */ - markdownPayload(content: string): this; + markdownPayload(content: string): ObservationBuilderWithPayload; +} + +/** + * Builder for creating observations (with payload set) + * + * This struct is returned by `ObservationBuilder::payload()` and + * `ObservationBuilder::custom_payload()`. It has the `build()` methods + * since a payload is required. + */ +export declare class ObservationBuilderWithPayload { /** * Build and send the observation * * Returns a SendObservation which allows you to wait for the upload to * complete or get the ObservationHandle immediately. + * + * If sending fails, returns a stub that will fail on `wait_for_upload()`. */ send(execution: ExecutionHandle): SendObservation; } diff --git a/crates/observation-tools-client/index.js b/crates/observation-tools-client/index.js index a0712d2..aea348d 100644 --- a/crates/observation-tools-client/index.js +++ b/crates/observation-tools-client/index.js @@ -765,6 +765,7 @@ module.exports.Client = nativeBinding.Client; module.exports.ClientBuilder = nativeBinding.ClientBuilder; module.exports.ExecutionHandle = nativeBinding.ExecutionHandle; module.exports.ObservationBuilder = nativeBinding.ObservationBuilder; +module.exports.ObservationBuilderWithPayload = nativeBinding.ObservationBuilderWithPayload; module.exports.ObservationHandle = nativeBinding.ObservationHandle; module.exports.SendObservation = nativeBinding.SendObservation; module.exports.generateExecutionId = nativeBinding.generateExecutionId; diff --git a/crates/observation-tools-client/src/client.rs b/crates/observation-tools-client/src/client.rs index 1b33085..9bc0127 100644 --- a/crates/observation-tools-client/src/client.rs +++ b/crates/observation-tools-client/src/client.rs @@ -3,6 +3,7 @@ use crate::error::Result; use crate::execution::BeginExecution; use crate::execution::ExecutionHandle; +use crate::observation_handle::ObservationHandle; use async_channel; use log::error; use log::info; @@ -15,21 +16,31 @@ pub use observation_tools_shared::BATCH_SIZE; pub use observation_tools_shared::BLOB_THRESHOLD_BYTES; use std::sync::Arc; +/// Result type for observation upload completion notifications via watch +/// channel Uses String for error since crate::Error doesn't implement Clone +pub(crate) type ObservationUploadResult = Option>; + +/// Result type for execution upload completion notifications via watch channel +/// Uses String for error since crate::Error doesn't implement Clone +pub(crate) type ExecutionUploadResult = Option>; + /// Message types for the background uploader task pub(crate) enum UploaderMessage { Execution { execution: Execution, - uploaded_tx: tokio::sync::oneshot::Sender<()>, + handle: ExecutionHandle, + uploaded_tx: tokio::sync::watch::Sender, }, Observations { observations: Vec, - uploaded_tx: tokio::sync::oneshot::Sender<()>, + handle: ObservationHandle, + uploaded_tx: tokio::sync::watch::Sender, }, Flush, Shutdown, } -// Manual Debug implementation since oneshot::Sender doesn't implement Debug +// Manual Debug implementation since watch::Sender doesn't implement Debug impl std::fmt::Debug for UploaderMessage { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -37,9 +48,14 @@ impl std::fmt::Debug for UploaderMessage { .debug_struct("Execution") .field("execution", execution) .finish(), - Self::Observations { observations, .. } => f + Self::Observations { + observations, + handle, + .. + } => f .debug_struct("Observations") .field("observations", observations) + .field("handle", handle) .finish(), Self::Flush => write!(f, "Flush"), Self::Shutdown => write!(f, "Shutdown"), @@ -114,19 +130,20 @@ impl Client { fn begin_execution_internal(&self, execution: Execution) -> Result { trace!("Beginning new execution with ID {}", execution.id); - let (uploaded_tx, uploaded_rx) = tokio::sync::oneshot::channel(); + let handle = ExecutionHandle::new( + execution.id, + self.inner.uploader_tx.clone(), + self.inner.base_url.clone(), + ); + let (uploaded_tx, uploaded_rx) = tokio::sync::watch::channel(None); self .inner .uploader_tx .try_send(UploaderMessage::Execution { execution: execution.clone(), + handle: handle.clone(), uploaded_tx, })?; - let handle = ExecutionHandle::new( - execution.id, - self.inner.uploader_tx.clone(), - self.inner.base_url.clone(), - ); Ok(BeginExecution::new(handle, uploaded_rx)) } @@ -256,52 +273,66 @@ async fn uploader_task( rx: async_channel::Receiver, ) { info!("Uploader task started"); - let flush_observations = - async |buffer: &mut Vec, senders: &mut Vec>| { - if buffer.is_empty() { - return; - } - let result = upload_observations(&api_client, buffer.drain(..).collect()).await; - match result { - Ok(()) => { - // Signal all senders that observations were uploaded - for sender in senders.drain(..) { - let _ = sender.send(()); - } + + // Buffer type for observation senders: (handle, sender) + type ObservationSender = ( + ObservationHandle, + tokio::sync::watch::Sender, + ); + + let flush_observations = async |buffer: &mut Vec, + senders: &mut Vec| { + if buffer.is_empty() { + return; + } + let result = upload_observations(&api_client, buffer.drain(..).collect()).await; + match result { + Ok(()) => { + // Signal all senders that observations were uploaded successfully + for (handle, sender) in senders.drain(..) { + let _ = sender.send(Some(Ok(handle))); } - Err(e) => { - error!("Failed to upload observations: {}", e); - // Clear senders on error (they won't receive notification) - senders.clear(); + } + Err(e) => { + let error_msg = e.to_string(); + error!("Failed to upload observations: {}", error_msg); + // Signal all senders with the error (as String for Clone compatibility) + for (_, sender) in senders.drain(..) { + let _ = sender.send(Some(Err(error_msg.clone()))); } } - }; + } + }; let mut observation_buffer: Vec = Vec::new(); - let mut sender_buffer: Vec> = Vec::new(); + let mut sender_buffer: Vec = Vec::new(); loop { let msg = rx.recv().await.ok(); match msg { Some(UploaderMessage::Execution { execution, + handle, uploaded_tx, }) => { let result = upload_execution(&api_client, execution).await; match result { Ok(()) => { - // Signal successful upload - let _ = uploaded_tx.send(()); + // Signal successful upload with handle + let _ = uploaded_tx.send(Some(Ok(handle))); } Err(e) => { - error!("Failed to upload execution: {}", e); + let error_msg = e.to_string(); + error!("Failed to upload execution: {}", error_msg); + let _ = uploaded_tx.send(Some(Err(error_msg))); } } } Some(UploaderMessage::Observations { observations, + handle, uploaded_tx, }) => { observation_buffer.extend(observations); - sender_buffer.push(uploaded_tx); + sender_buffer.push((handle, uploaded_tx)); if observation_buffer.len() >= BATCH_SIZE { flush_observations(&mut observation_buffer, &mut sender_buffer).await; } diff --git a/crates/observation-tools-client/src/error.rs b/crates/observation-tools-client/src/error.rs index fc402e1..248c3b0 100644 --- a/crates/observation-tools-client/src/error.rs +++ b/crates/observation-tools-client/src/error.rs @@ -40,6 +40,10 @@ pub enum Error { #[error("Failed to send uploader message: {0}")] TrySendError(String), + + /// Upload failed with error + #[error("Upload failed: {0}")] + UploadFailed(String), } impl From> for Error { diff --git a/crates/observation-tools-client/src/execution.rs b/crates/observation-tools-client/src/execution.rs index 12e3b7f..7bcc91d 100644 --- a/crates/observation-tools-client/src/execution.rs +++ b/crates/observation-tools-client/src/execution.rs @@ -1,22 +1,26 @@ //! Execution handle for managing observation context +use crate::client::ExecutionUploadResult; +use crate::client::ObservationUploadResult; use crate::client::UploaderMessage; use crate::error::Result; +use crate::observation_handle::ObservationHandle; use crate::Error; use async_channel; use napi_derive::napi; use observation_tools_shared::models::ExecutionId; use observation_tools_shared::models::Observation; +use observation_tools_shared::models::ObservationId; pub struct BeginExecution { handle: ExecutionHandle, - uploaded_rx: tokio::sync::oneshot::Receiver<()>, + uploaded_rx: tokio::sync::watch::Receiver, } impl BeginExecution { pub(crate) fn new( handle: ExecutionHandle, - uploaded_rx: tokio::sync::oneshot::Receiver<()>, + uploaded_rx: tokio::sync::watch::Receiver, ) -> Self { Self { handle, @@ -25,9 +29,23 @@ impl BeginExecution { } /// Wait for the execution to be uploaded to the server - pub async fn wait_for_upload(self) -> Result { - self.uploaded_rx.await.map_err(|_| Error::ChannelClosed)?; - Ok(self.handle) + pub async fn wait_for_upload(mut self) -> Result { + // Wait for value to change from None to Some + loop { + { + let value = self.uploaded_rx.borrow_and_update(); + match &*value { + Some(Ok(handle)) => return Ok(handle.clone()), + Some(Err(error_msg)) => return Err(Error::UploadFailed(error_msg.clone())), + None => {} + } + } + self + .uploaded_rx + .changed() + .await + .map_err(|_| Error::ChannelClosed)?; + } } pub fn handle(&self) -> &ExecutionHandle { @@ -41,7 +59,7 @@ impl BeginExecution { /// Handle to an execution that can be used to send observations #[napi] -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct ExecutionHandle { pub(crate) execution_id: ExecutionId, pub(crate) uploader_tx: async_channel::Sender, @@ -73,13 +91,20 @@ impl ExecutionHandle { /// Send an observation (internal use, doesn't wait for upload) pub(crate) fn send_observation(&self, observation: Observation) -> Result<()> { - // Create a oneshot channel but drop the receiver since we don't wait - let (uploaded_tx, _uploaded_rx) = tokio::sync::oneshot::channel(); + // Create a watch channel but drop the receiver since we don't wait + let (uploaded_tx, _uploaded_rx) = tokio::sync::watch::channel::(None); + + let handle = ObservationHandle { + base_url: self.base_url.clone(), + observation_id: observation.id, + execution_id: self.execution_id, + }; self .uploader_tx .try_send(UploaderMessage::Observations { observations: vec![observation], + handle, uploaded_tx, }) .map_err(|_| Error::ChannelClosed) @@ -120,7 +145,6 @@ impl ExecutionHandle { metadata: Option>>, ) -> napi::Result { use observation_tools_shared::models::Observation; - use observation_tools_shared::models::ObservationId; use observation_tools_shared::models::Payload; use observation_tools_shared::models::SourceInfo; use std::collections::HashMap; diff --git a/crates/observation-tools-client/src/lib.rs b/crates/observation-tools-client/src/lib.rs index 9e09d20..15c557e 100644 --- a/crates/observation-tools-client/src/lib.rs +++ b/crates/observation-tools-client/src/lib.rs @@ -20,6 +20,7 @@ pub use execution::BeginExecution; pub use execution::ExecutionHandle; pub use logger::ObservationLogger; pub use observation::ObservationBuilder; +pub use observation::ObservationBuilderWithPayload; pub use observation_handle::ObservationHandle; pub use observation_handle::SendObservation; // Re-export procedural macro diff --git a/crates/observation-tools-client/src/logger.rs b/crates/observation-tools-client/src/logger.rs index fdad0e5..5990ca2 100644 --- a/crates/observation-tools-client/src/logger.rs +++ b/crates/observation-tools-client/src/logger.rs @@ -37,15 +37,15 @@ impl Log for ObservationLogger { return; } - let mut builder = ObservationBuilder::new("ObservationLogger") + let mut builder = ObservationBuilder::new("ObservationLogger"); + builder .observation_type(ObservationType::LogEntry) .log_level(record.level().into()) - .label(format!("log/{}", record.target())) - .payload(&format!("{}", record.args())); + .label(format!("log/{}", record.target())); if let (Some(file), Some(line)) = (record.file(), record.line()) { - builder = builder.source(file, line); + builder.source(file, line); } - let _ = builder.build(); + let _ = builder.payload(&format!("{}", record.args())).build(); } fn flush(&self) { diff --git a/crates/observation-tools-client/src/observation.rs b/crates/observation-tools-client/src/observation.rs index 2ae54d6..8c8736a 100644 --- a/crates/observation-tools-client/src/observation.rs +++ b/crates/observation-tools-client/src/observation.rs @@ -1,8 +1,8 @@ //! Observation builder API +use crate::client::ObservationUploadResult; use crate::client::UploaderMessage; use crate::context; -use crate::error::Result; use crate::execution::ExecutionHandle; use crate::observation_handle::ObservationHandle; use crate::observation_handle::SendObservation; @@ -15,10 +15,15 @@ use observation_tools_shared::models::Payload; use observation_tools_shared::models::SourceInfo; use observation_tools_shared::IntoCustomPayload; use observation_tools_shared::LogLevel; +use observation_tools_shared::Markdown; use observation_tools_shared::ObservationType; use std::collections::HashMap; -/// Builder for creating observations +/// Builder for creating observations (without payload set yet) +/// +/// Call `.payload()` or `.custom_payload()` to get an +/// `ObservationBuilderWithPayload` that can be built. +#[derive(Clone)] #[napi] pub struct ObservationBuilder { name: String, @@ -26,7 +31,6 @@ pub struct ObservationBuilder { metadata: HashMap, source: Option, parent_span_id: Option, - payload: Option, observation_type: ObservationType, log_level: LogLevel, } @@ -40,32 +44,31 @@ impl ObservationBuilder { metadata: HashMap::new(), source: None, parent_span_id: None, - payload: None, observation_type: ObservationType::Payload, log_level: LogLevel::Info, } } /// Add a label to the observation - pub fn label(mut self, label: impl Into) -> Self { + pub fn label(&mut self, label: impl Into) -> &mut Self { self.labels.push(label.into()); self } /// Add multiple labels to the observation - pub fn labels(mut self, labels: impl IntoIterator>) -> Self { + pub fn labels(&mut self, labels: impl IntoIterator>) -> &mut Self { self.labels.extend(labels.into_iter().map(|l| l.into())); self } /// Add metadata to the observation - pub fn metadata(mut self, key: impl Into, value: impl Into) -> Self { + pub fn metadata(&mut self, key: impl Into, value: impl Into) -> &mut Self { self.metadata.insert(key.into(), value.into()); self } /// Set the source info for the observation - pub fn source(mut self, file: impl Into, line: u32) -> Self { + pub fn source(&mut self, file: impl Into, line: u32) -> &mut Self { self.source = Some(SourceInfo { file: file.into(), line, @@ -75,82 +78,37 @@ impl ObservationBuilder { } /// Set the parent span ID - pub fn parent_span_id(mut self, span_id: impl Into) -> Self { + pub fn parent_span_id(&mut self, span_id: impl Into) -> &mut Self { self.parent_span_id = Some(span_id.into()); self } /// Set the observation type - pub fn observation_type(mut self, observation_type: ObservationType) -> Self { + pub fn observation_type(&mut self, observation_type: ObservationType) -> &mut Self { self.observation_type = observation_type; self } /// Set the log level - pub fn log_level(mut self, log_level: LogLevel) -> Self { + pub fn log_level(&mut self, log_level: LogLevel) -> &mut Self { self.log_level = log_level; self } - pub fn payload(mut self, value: &T) -> Self { - self.payload = Some(value.to_payload()); - self - } - - pub fn custom_payload(mut self, value: &T) -> Self { - self.payload = Some(value.to_payload()); - self - } - - /// Build and send the observation using the current execution context - /// - /// Returns a `SendObservation` which allows you to wait for the observation - /// to be uploaded before proceeding, or to get the observation ID - /// immediately. - pub fn build(self) -> Result { - let execution = context::get_current_execution().ok_or(Error::NoExecutionContext)?; - self.build_with_execution(&execution) + /// Set the payload and return a builder that can be built + pub fn payload(&self, value: &T) -> ObservationBuilderWithPayload { + ObservationBuilderWithPayload { + fields: self.clone(), + payload: value.to_payload(), + } } - /// Build and send the observation using an explicit execution handle - pub fn build_with_execution(self, execution: &ExecutionHandle) -> Result { - let observation_id = ObservationId::new(); - let observation = Observation { - id: observation_id, - execution_id: execution.id(), - name: self.name, - observation_type: self.observation_type, - log_level: self.log_level, - labels: self.labels, - metadata: self.metadata, - source: self.source, - parent_span_id: self.parent_span_id, - payload: self.payload.ok_or(Error::MissingPayload)?, - created_at: chrono::Utc::now(), - }; - let (uploaded_tx, uploaded_rx) = tokio::sync::oneshot::channel(); - // Log before sending so any error comes afterward - log::info!( - "Sending: {}/exe/{}/obs/{}", - execution.base_url(), - execution.id(), - observation_id - ); - execution - .uploader_tx - .try_send(UploaderMessage::Observations { - observations: vec![observation], - uploaded_tx, - }) - .map_err(|_| Error::ChannelClosed)?; - Ok(SendObservation::new( - ObservationHandle { - base_url: execution.base_url().to_string(), - execution_id: execution.id(), - observation_id, - }, - uploaded_rx, - )) + /// Set a custom payload and return a builder that can be built + pub fn custom_payload(&self, value: &T) -> ObservationBuilderWithPayload { + ObservationBuilderWithPayload { + fields: self.clone(), + payload: value.to_payload(), + } } } @@ -159,93 +117,116 @@ impl ObservationBuilder { /// Create a new observation builder with the given name #[napi(constructor)] pub fn new_napi(name: String) -> Self { - Self { - name, - labels: Vec::new(), - metadata: HashMap::new(), - source: None, - parent_span_id: None, - payload: None, - observation_type: ObservationType::Payload, - log_level: LogLevel::Info, - } + Self::new(name) } /// Add a label to the observation #[napi(js_name = "label")] pub fn label_napi(&mut self, label: String) -> &Self { - self.labels.push(label); - self + self.label(label) } /// Add metadata to the observation #[napi(js_name = "metadata")] pub fn metadata_napi(&mut self, key: String, value: String) -> &Self { - self.metadata.insert(key, value); - self + self.metadata(key, value) } /// Set the source info for the observation #[napi(js_name = "source")] pub fn source_napi(&mut self, file: String, line: u32) -> &Self { - self.source = Some(SourceInfo { - file, - line, - column: None, - }); - self + self.source(file, line) } /// Set the payload as JSON data #[napi(js_name = "jsonPayload")] - pub fn json_payload_napi(&mut self, json_string: String) -> napi::Result<&Self> { + pub fn json_payload_napi( + &self, + json_string: String, + ) -> napi::Result { serde_json::from_str::(&json_string) .map_err(|e| napi::Error::from_reason(format!("Invalid JSON payload: {}", e)))?; - self.payload = Some(Payload::json(json_string)); - Ok(self) + Ok(self.payload(&Payload::json(json_string))) } /// Set the payload with custom data and MIME type #[napi(js_name = "rawPayload")] - pub fn raw_payload_napi(&mut self, data: String, mime_type: String) -> &Self { - self.payload = Some(Payload::with_mime_type(data, mime_type)); - self + pub fn raw_payload_napi(&self, data: String, mime_type: String) -> ObservationBuilderWithPayload { + self.payload(&Payload::with_mime_type(data, mime_type)) } /// Set the payload as markdown content #[napi(js_name = "markdownPayload")] - pub fn markdown_payload_napi(&mut self, content: String) -> &Self { - self.payload = Some(Payload::with_mime_type(content, "text/markdown")); - self + pub fn markdown_payload_napi(&self, content: String) -> ObservationBuilderWithPayload { + self.custom_payload(&Markdown::from(content)) } +} - /// Build and send the observation +/// Builder for creating observations (with payload set) +/// +/// This struct is returned by `ObservationBuilder::payload()` and +/// `ObservationBuilder::custom_payload()`. It has the `build()` methods +/// since a payload is required. +#[napi] +pub struct ObservationBuilderWithPayload { + fields: ObservationBuilder, + payload: Payload, +} + +impl ObservationBuilderWithPayload { + /// Build and send the observation using the current execution context /// - /// Returns a SendObservation which allows you to wait for the upload to - /// complete or get the ObservationHandle immediately. - #[napi] - pub fn send(&mut self, execution: &ExecutionHandle) -> napi::Result { + /// Returns a `SendObservation` which allows you to wait for the observation + /// to be uploaded before proceeding, or to get the observation ID + /// immediately. + /// + /// If no execution context is available, logs an error and returns a stub + /// SendObservation that will fail on `wait_for_upload()`. + pub fn build(self) -> SendObservation { + match context::get_current_execution() { + Some(execution) => self.build_with_execution(&execution), + None => { + log::error!( + "No execution context available for observation '{}'", + self.fields.name + ); + SendObservation::stub(Error::NoExecutionContext) + } + } + } + + /// Build and send the observation using an explicit execution handle + /// + /// Returns a `SendObservation` which allows you to wait for the observation + /// to be uploaded. If sending fails, returns a stub that will fail on + /// `wait_for_upload()`. + pub fn build_with_execution(self, execution: &ExecutionHandle) -> SendObservation { let observation_id = ObservationId::new(); + + let handle = ObservationHandle { + base_url: execution.base_url().to_string(), + execution_id: execution.id(), + observation_id, + }; + let observation = Observation { id: observation_id, execution_id: execution.id(), - name: self.name.clone(), - observation_type: self.observation_type, - log_level: self.log_level, - labels: std::mem::take(&mut self.labels), - metadata: std::mem::take(&mut self.metadata), - source: self.source.take(), - parent_span_id: self.parent_span_id.take(), - payload: self - .payload - .take() - .ok_or_else(|| napi::Error::from_reason("Payload is required"))?, + name: self.fields.name, + observation_type: self.fields.observation_type, + log_level: self.fields.log_level, + labels: self.fields.labels, + metadata: self.fields.metadata, + source: self.fields.source, + parent_span_id: self.fields.parent_span_id, + payload: self.payload, created_at: chrono::Utc::now(), }; - let (uploaded_tx, uploaded_rx) = tokio::sync::oneshot::channel(); + let (uploaded_tx, uploaded_rx) = tokio::sync::watch::channel::(None); + // Log before sending so any error comes afterward log::info!( "Sending: {}/exe/{}/obs/{}", execution.base_url(), @@ -253,21 +234,36 @@ impl ObservationBuilder { observation_id ); - execution + if let Err(e) = execution .uploader_tx - .try_send(crate::client::UploaderMessage::Observations { + .try_send(UploaderMessage::Observations { observations: vec![observation], + handle: handle.clone(), uploaded_tx, }) - .map_err(|_| napi::Error::from_reason("Channel closed"))?; + { + log::error!("Failed to send observation: {}", e); + return SendObservation::stub(Error::ChannelClosed); + } - Ok(SendObservation::new( - ObservationHandle { - base_url: execution.base_url().to_string(), - execution_id: execution.id(), - observation_id, - }, - uploaded_rx, - )) + SendObservation::new(handle, uploaded_rx) + } +} + +#[napi] +impl ObservationBuilderWithPayload { + /// Build and send the observation + /// + /// Returns a SendObservation which allows you to wait for the upload to + /// complete or get the ObservationHandle immediately. + /// + /// If sending fails, returns a stub that will fail on `wait_for_upload()`. + #[napi] + pub fn send(&self, execution: &ExecutionHandle) -> SendObservation { + let with_payload = ObservationBuilderWithPayload { + fields: self.fields.clone(), + payload: self.payload.clone(), + }; + with_payload.build_with_execution(execution) } } diff --git a/crates/observation-tools-client/src/observation_handle.rs b/crates/observation-tools-client/src/observation_handle.rs index dfb38c8..b38f4f2 100644 --- a/crates/observation-tools-client/src/observation_handle.rs +++ b/crates/observation-tools-client/src/observation_handle.rs @@ -1,30 +1,66 @@ //! Observation handle types +use crate::client::ObservationUploadResult; use crate::error::Result; use crate::Error; use napi_derive::napi; +use observation_tools_shared::models::ExecutionId; +use observation_tools_shared::models::ObservationId; #[napi] pub struct SendObservation { pub(crate) handle: ObservationHandle, - pub(crate) uploaded_rx: Option>, + pub(crate) uploaded_rx: Option>, + /// Error that occurred during creation (for stub observations) + pub(crate) creation_error: Option, } impl SendObservation { pub(crate) fn new( handle: ObservationHandle, - uploaded_rx: tokio::sync::oneshot::Receiver<()>, + uploaded_rx: tokio::sync::watch::Receiver, ) -> Self { Self { handle, uploaded_rx: Some(uploaded_rx), + creation_error: None, } } - pub async fn wait_for_upload(self) -> Result { - let rx = self.uploaded_rx.ok_or(Error::ChannelClosed)?; - rx.await.map_err(|_| Error::ChannelClosed)?; - Ok(self.handle) + /// Create a stub SendObservation that will fail on wait_for_upload() + /// + /// This is used when observation creation fails (e.g., no execution context). + /// The stub allows callers to ignore errors at creation time but still + /// detect failures when explicitly waiting. + pub(crate) fn stub(error: Error) -> Self { + Self { + handle: ObservationHandle::placeholder(), + uploaded_rx: None, + creation_error: Some(error), + } + } + + pub async fn wait_for_upload(mut self) -> Result { + // Return creation error if present + if let Some(err) = self.creation_error { + return Err(err); + } + + // Get receiver (must be mutable for borrow_and_update and changed) + let rx = self.uploaded_rx.as_mut().ok_or(Error::ChannelClosed)?; + + // Wait for value to change from None to Some + loop { + { + let value = rx.borrow_and_update(); + match &*value { + Some(Ok(handle)) => return Ok(handle.clone()), + Some(Err(error_msg)) => return Err(Error::UploadFailed(error_msg.clone())), + None => {} + } + } + rx.changed().await.map_err(|_| Error::ChannelClosed)?; + } } pub fn handle(&self) -> &ObservationHandle { @@ -45,17 +81,26 @@ impl SendObservation { } #[napi] -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct ObservationHandle { pub(crate) base_url: String, - pub(crate) observation_id: observation_tools_shared::models::ObservationId, - pub(crate) execution_id: observation_tools_shared::models::ExecutionId, + pub(crate) observation_id: ObservationId, + pub(crate) execution_id: ExecutionId, } impl ObservationHandle { - pub fn id(&self) -> &observation_tools_shared::models::ObservationId { + pub fn id(&self) -> &ObservationId { &self.observation_id } + + /// Create a placeholder handle for stub observations + pub(crate) fn placeholder() -> Self { + Self { + base_url: String::new(), + observation_id: ObservationId::nil(), + execution_id: ExecutionId::nil(), + } + } } #[napi] diff --git a/crates/observation-tools-client/tests/api_integration_test.rs b/crates/observation-tools-client/tests/api_integration_test.rs index 9ad2b5e..55c91cd 100644 --- a/crates/observation-tools-client/tests/api_integration_test.rs +++ b/crates/observation-tools-client/tests/api_integration_test.rs @@ -52,12 +52,12 @@ async fn test_create_observation_with_metadata() -> anyhow::Result<()> { observation_tools_client::with_execution(execution, async { observation_tools_client::ObservationBuilder::new("test-observation") - .payload("test payload data") .label("test/label1") .label("test/label2") .metadata("key1", "value1") .metadata("key2", "value2") - .build()? + .payload("test payload data") + .build() .wait_for_upload() .await?; @@ -107,7 +107,7 @@ async fn test_create_many_observations() -> anyhow::Result<()> { // Create BATCH_SIZE observations to test batching behavior for i in 0..observation_tools_client::BATCH_SIZE { let obs_name = format!("observation-{}", i); - observe!(&obs_name, payload_data)?; + observe!(&obs_name, payload_data); expected_names.insert(obs_name); } @@ -199,7 +199,7 @@ async fn test_concurrent_executions() -> anyhow::Result<()> { name = TASK_1_NAME, label = "concurrent/task1", payload = "data from task 1" - )? + ) .wait_for_upload() .await?; let _ = task1_sender.send(()); @@ -219,7 +219,7 @@ async fn test_concurrent_executions() -> anyhow::Result<()> { name = TASK_2_NAME, label = "concurrent/task2", payload = "data from task 2" - )? + ) .wait_for_upload() .await?; debug!("Task 2 waiting for task 1"); @@ -290,7 +290,7 @@ async fn test_large_payload_blob_upload() -> anyhow::Result<()> { name = "large-observation", label = "test/large-payload", payload = large_payload - )? + ) .wait_for_upload() .await }) diff --git a/crates/observation-tools-client/tests/api_key_integration_test.rs b/crates/observation-tools-client/tests/api_key_integration_test.rs index ecb878d..9c57dcb 100644 --- a/crates/observation-tools-client/tests/api_key_integration_test.rs +++ b/crates/observation-tools-client/tests/api_key_integration_test.rs @@ -44,10 +44,10 @@ async fn test_api_with_valid_key() -> anyhow::Result<()> { observation_tools_client::with_execution(execution, async { observation_tools_client::ObservationBuilder::new("test-observation") - .payload("test payload data") .label("test/label") .metadata("key1", "value1") - .build()? + .payload("test payload data") + .build() .wait_for_upload() .await?; @@ -196,7 +196,7 @@ async fn test_blob_upload_with_auth() -> anyhow::Result<()> { name = "large-observation", label = "test/large-payload", payload = large_payload - )? + ) .wait_for_upload() .await }) @@ -289,7 +289,7 @@ async fn test_observation_without_auth_after_execution_with_auth() -> anyhow::Re let unauth_client = server.create_client()?; let result = observation_tools_client::with_execution(execution, async { - observation_tools_client::observe!(name = "test-observation", payload = "test data")? + observation_tools_client::observe!(name = "test-observation", payload = "test data") .wait_for_upload() .await }) @@ -382,7 +382,7 @@ async fn test_blob_retrieval_without_auth_succeeds() -> anyhow::Result<()> { }); let observation_id = observation_tools_client::with_execution(execution, async { - observation_tools_client::observe!(name = "large-observation", payload = large_payload)? + observation_tools_client::observe!(name = "large-observation", payload = large_payload) .wait_for_upload() .await }) diff --git a/crates/observation-tools-client/tests/logger_test.rs b/crates/observation-tools-client/tests/logger_test.rs index 258c69c..b94e3a7 100644 --- a/crates/observation-tools-client/tests/logger_test.rs +++ b/crates/observation-tools-client/tests/logger_test.rs @@ -20,7 +20,7 @@ async fn test_logger() -> anyhow::Result<()> { let execution_id = execution.id(); observation_tools_client::with_execution(execution, async { log::info!("info-log"); - observe!("direct-log", "direct-log-payload")? + observe!("direct-log", "direct-log-payload") .wait_for_upload() .await?; Ok::<_, anyhow::Error>(()) diff --git a/crates/observation-tools-client/tests/observe_macro_test.rs b/crates/observation-tools-client/tests/observe_macro_test.rs index 7e5b4d8..638d4c3 100644 --- a/crates/observation-tools-client/tests/observe_macro_test.rs +++ b/crates/observation-tools-client/tests/observe_macro_test.rs @@ -17,7 +17,7 @@ async fn test_observe_simple_string_payload() -> anyhow::Result<()> { let execution_id = execution.id(); let observation = observation_tools_client::with_execution(execution, async { - let handle = observe!("simple_string", "hello world")? + let handle = observe!("simple_string", "hello world") .wait_for_upload() .await?; @@ -71,7 +71,7 @@ async fn test_observe_serde_struct() -> anyhow::Result<()> { message: "test message".to_string(), count: 42, } - )? + ) .wait_for_upload() .await?; @@ -129,7 +129,7 @@ async fn test_observe_custom_payload() -> anyhow::Result<()> { message: "custom message".to_string() }, custom_serialization = true - )? + ) .wait_for_upload() .await?; @@ -187,7 +187,7 @@ async fn test_observe_custom_with_new_syntax() -> anyhow::Result<()> { value: "test".to_string() }, custom = true - )? + ) .wait_for_upload() .await?; @@ -230,7 +230,7 @@ async fn test_observe_variable_name_capture() -> anyhow::Result<()> { let observation = observation_tools_client::with_execution(execution, async { let my_data = "captured variable name"; - let handle = observe!(my_data)?.wait_for_upload().await?; + let handle = observe!(my_data).wait_for_upload().await?; Ok::<_, anyhow::Error>(handle) }) @@ -274,7 +274,7 @@ async fn test_observe_structured_syntax() -> anyhow::Result<()> { name = "structured_observation", payload = "test payload", label = "test/category" - )? + ) .wait_for_upload() .await?; @@ -325,7 +325,7 @@ async fn test_observe_metadata_syntax() -> anyhow::Result<()> { status_code: "200", duration_ms: duration_ms.to_string() } - )? + ) .wait_for_upload() .await?; @@ -370,7 +370,7 @@ async fn test_observe_expression_name() -> anyhow::Result<()> { let observation = observation_tools_client::with_execution(execution, async { const OBSERVATION_NAME: &str = "const_name"; - let handle = observe!(name = OBSERVATION_NAME, payload = "test data")? + let handle = observe!(name = OBSERVATION_NAME, payload = "test data") .wait_for_upload() .await?; @@ -413,7 +413,7 @@ async fn test_observe_dynamic_name() -> anyhow::Result<()> { let prefix = "dynamic"; let name = format!("{}_observation", prefix); - let handle = observe!(&name, "test payload")?.wait_for_upload().await?; + let handle = observe!(&name, "test payload").wait_for_upload().await?; Ok::<_, anyhow::Error>(handle) }) @@ -454,7 +454,7 @@ async fn test_observe_dynamic_label() -> anyhow::Result<()> { let endpoint = "users"; let label = format!("api/{}/create", endpoint); - let handle = observe!(name = "request", payload = "data", label = label)? + let handle = observe!(name = "request", payload = "data", label = label) .wait_for_upload() .await?; diff --git a/crates/observation-tools-macros/src/lib.rs b/crates/observation-tools-macros/src/lib.rs index b96c038..4524bf0 100644 --- a/crates/observation-tools-macros/src/lib.rs +++ b/crates/observation-tools-macros/src/lib.rs @@ -267,13 +267,16 @@ pub fn observe(input: TokenStream) -> TokenStream { } }); + // Note: .source(), .label(), and .metadata() must come before .payload() + // because .payload() returns ObservationBuilderWithPayload which only has + // .build() let expanded = quote! { { ::observation_tools_client::ObservationBuilder::new(#name) - .#payload_method(&#payload) .source(#file, #line) #label_call #(#metadata_calls)* + .#payload_method(&#payload) .build() } }; diff --git a/crates/observation-tools-shared/src/models.rs b/crates/observation-tools-shared/src/models.rs index 19d0d9e..9c444ae 100644 --- a/crates/observation-tools-shared/src/models.rs +++ b/crates/observation-tools-shared/src/models.rs @@ -30,6 +30,11 @@ impl ExecutionId { Self(Uuid::now_v7()) } + /// Create a nil (all zeros) execution ID for placeholder use + pub fn nil() -> Self { + Self(Uuid::nil()) + } + /// Parse from a string pub fn parse(s: &str) -> crate::Result { Ok(Self(Uuid::parse_str(s)?)) @@ -69,6 +74,11 @@ impl ObservationId { Self(Uuid::now_v7()) } + /// Create a nil (all zeros) observation ID for placeholder use + pub fn nil() -> Self { + Self(Uuid::nil()) + } + /// Parse from a string pub fn parse(s: &str) -> crate::Result { let uuid = Uuid::parse_str(s).map_err(crate::Error::InvalidObservationId)?;