Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions crates/observation-tools-client/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@ export declare class ExecutionHandle {
): string;
}

/** Builder for creating observations */
/**
* Builder for creating observations (without payload set yet)
*
* Call `.payload()` or `.custom_payload()` to get an
* `ObservationBuilderWithPayload` that can be built.
*/
export declare class ObservationBuilder {
/** Create a new observation builder with the given name */
constructor(name: string);
Expand All @@ -62,16 +67,28 @@ export declare class ObservationBuilder {
/** Set the source info for the observation */
source(file: string, line: number): this;
/** Set the payload as JSON data */
jsonPayload(jsonString: string): this;
jsonPayload(jsonString: string): ObservationBuilderWithPayload;
/** Set the payload with custom data and MIME type */
rawPayload(data: string, mimeType: string): this;
rawPayload(data: string, mimeType: string): ObservationBuilderWithPayload;
/** Set the payload as markdown content */
markdownPayload(content: string): this;
markdownPayload(content: string): ObservationBuilderWithPayload;
}

/**
* Builder for creating observations (with payload set)
*
* This struct is returned by `ObservationBuilder::payload()` and
* `ObservationBuilder::custom_payload()`. It has the `build()` methods
* since a payload is required.
*/
export declare class ObservationBuilderWithPayload {
/**
* Build and send the observation
*
* Returns a SendObservation which allows you to wait for the upload to
* complete or get the ObservationHandle immediately.
*
* If sending fails, returns a stub that will fail on `wait_for_upload()`.
*/
send(execution: ExecutionHandle): SendObservation;
}
Expand Down
1 change: 1 addition & 0 deletions crates/observation-tools-client/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ module.exports.Client = nativeBinding.Client;
module.exports.ClientBuilder = nativeBinding.ClientBuilder;
module.exports.ExecutionHandle = nativeBinding.ExecutionHandle;
module.exports.ObservationBuilder = nativeBinding.ObservationBuilder;
module.exports.ObservationBuilderWithPayload = nativeBinding.ObservationBuilderWithPayload;
module.exports.ObservationHandle = nativeBinding.ObservationHandle;
module.exports.SendObservation = nativeBinding.SendObservation;
module.exports.generateExecutionId = nativeBinding.generateExecutionId;
95 changes: 63 additions & 32 deletions crates/observation-tools-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use crate::error::Result;
use crate::execution::BeginExecution;
use crate::execution::ExecutionHandle;
use crate::observation_handle::ObservationHandle;
use async_channel;
use log::error;
use log::info;
Expand All @@ -15,31 +16,46 @@ pub use observation_tools_shared::BATCH_SIZE;
pub use observation_tools_shared::BLOB_THRESHOLD_BYTES;
use std::sync::Arc;

/// Result type for observation upload completion notifications via watch
/// channel Uses String for error since crate::Error doesn't implement Clone
pub(crate) type ObservationUploadResult = Option<std::result::Result<ObservationHandle, String>>;

/// Result type for execution upload completion notifications via watch channel
/// Uses String for error since crate::Error doesn't implement Clone
pub(crate) type ExecutionUploadResult = Option<std::result::Result<ExecutionHandle, String>>;

/// Message types for the background uploader task
pub(crate) enum UploaderMessage {
Execution {
execution: Execution,
uploaded_tx: tokio::sync::oneshot::Sender<()>,
handle: ExecutionHandle,
uploaded_tx: tokio::sync::watch::Sender<ExecutionUploadResult>,
},
Observations {
observations: Vec<Observation>,
uploaded_tx: tokio::sync::oneshot::Sender<()>,
handle: ObservationHandle,
uploaded_tx: tokio::sync::watch::Sender<ObservationUploadResult>,
},
Flush,
Shutdown,
}

// Manual Debug implementation since oneshot::Sender doesn't implement Debug
// Manual Debug implementation since watch::Sender doesn't implement Debug
impl std::fmt::Debug for UploaderMessage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Execution { execution, .. } => f
.debug_struct("Execution")
.field("execution", execution)
.finish(),
Self::Observations { observations, .. } => f
Self::Observations {
observations,
handle,
..
} => f
.debug_struct("Observations")
.field("observations", observations)
.field("handle", handle)
.finish(),
Self::Flush => write!(f, "Flush"),
Self::Shutdown => write!(f, "Shutdown"),
Expand Down Expand Up @@ -114,19 +130,20 @@ impl Client {

fn begin_execution_internal(&self, execution: Execution) -> Result<BeginExecution> {
trace!("Beginning new execution with ID {}", execution.id);
let (uploaded_tx, uploaded_rx) = tokio::sync::oneshot::channel();
let handle = ExecutionHandle::new(
execution.id,
self.inner.uploader_tx.clone(),
self.inner.base_url.clone(),
);
let (uploaded_tx, uploaded_rx) = tokio::sync::watch::channel(None);
self
.inner
.uploader_tx
.try_send(UploaderMessage::Execution {
execution: execution.clone(),
handle: handle.clone(),
uploaded_tx,
})?;
let handle = ExecutionHandle::new(
execution.id,
self.inner.uploader_tx.clone(),
self.inner.base_url.clone(),
);
Ok(BeginExecution::new(handle, uploaded_rx))
}

Expand Down Expand Up @@ -256,52 +273,66 @@ async fn uploader_task(
rx: async_channel::Receiver<UploaderMessage>,
) {
info!("Uploader task started");
let flush_observations =
async |buffer: &mut Vec<Observation>, senders: &mut Vec<tokio::sync::oneshot::Sender<()>>| {
if buffer.is_empty() {
return;
}
let result = upload_observations(&api_client, buffer.drain(..).collect()).await;
match result {
Ok(()) => {
// Signal all senders that observations were uploaded
for sender in senders.drain(..) {
let _ = sender.send(());
}

// Buffer type for observation senders: (handle, sender)
type ObservationSender = (
ObservationHandle,
tokio::sync::watch::Sender<ObservationUploadResult>,
);

let flush_observations = async |buffer: &mut Vec<Observation>,
senders: &mut Vec<ObservationSender>| {
if buffer.is_empty() {
return;
}
let result = upload_observations(&api_client, buffer.drain(..).collect()).await;
match result {
Ok(()) => {
// Signal all senders that observations were uploaded successfully
for (handle, sender) in senders.drain(..) {
let _ = sender.send(Some(Ok(handle)));
}
Err(e) => {
error!("Failed to upload observations: {}", e);
// Clear senders on error (they won't receive notification)
senders.clear();
}
Err(e) => {
let error_msg = e.to_string();
error!("Failed to upload observations: {}", error_msg);
// Signal all senders with the error (as String for Clone compatibility)
for (_, sender) in senders.drain(..) {
let _ = sender.send(Some(Err(error_msg.clone())));
}
}
};
}
};
let mut observation_buffer: Vec<Observation> = Vec::new();
let mut sender_buffer: Vec<tokio::sync::oneshot::Sender<()>> = Vec::new();
let mut sender_buffer: Vec<ObservationSender> = Vec::new();
loop {
let msg = rx.recv().await.ok();
match msg {
Some(UploaderMessage::Execution {
execution,
handle,
uploaded_tx,
}) => {
let result = upload_execution(&api_client, execution).await;
match result {
Ok(()) => {
// Signal successful upload
let _ = uploaded_tx.send(());
// Signal successful upload with handle
let _ = uploaded_tx.send(Some(Ok(handle)));
}
Err(e) => {
error!("Failed to upload execution: {}", e);
let error_msg = e.to_string();
error!("Failed to upload execution: {}", error_msg);
let _ = uploaded_tx.send(Some(Err(error_msg)));
}
}
}
Some(UploaderMessage::Observations {
observations,
handle,
uploaded_tx,
}) => {
observation_buffer.extend(observations);
sender_buffer.push(uploaded_tx);
sender_buffer.push((handle, uploaded_tx));
if observation_buffer.len() >= BATCH_SIZE {
flush_observations(&mut observation_buffer, &mut sender_buffer).await;
}
Expand Down
4 changes: 4 additions & 0 deletions crates/observation-tools-client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ pub enum Error {

#[error("Failed to send uploader message: {0}")]
TrySendError(String),

/// Upload failed with error
#[error("Upload failed: {0}")]
UploadFailed(String),
}

impl From<TrySendError<UploaderMessage>> for Error {
Expand Down
42 changes: 33 additions & 9 deletions crates/observation-tools-client/src/execution.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
//! Execution handle for managing observation context

use crate::client::ExecutionUploadResult;
use crate::client::ObservationUploadResult;
use crate::client::UploaderMessage;
use crate::error::Result;
use crate::observation_handle::ObservationHandle;
use crate::Error;
use async_channel;
use napi_derive::napi;
use observation_tools_shared::models::ExecutionId;
use observation_tools_shared::models::Observation;
use observation_tools_shared::models::ObservationId;

pub struct BeginExecution {
handle: ExecutionHandle,
uploaded_rx: tokio::sync::oneshot::Receiver<()>,
uploaded_rx: tokio::sync::watch::Receiver<ExecutionUploadResult>,
}

impl BeginExecution {
pub(crate) fn new(
handle: ExecutionHandle,
uploaded_rx: tokio::sync::oneshot::Receiver<()>,
uploaded_rx: tokio::sync::watch::Receiver<ExecutionUploadResult>,
) -> Self {
Self {
handle,
Expand All @@ -25,9 +29,23 @@ impl BeginExecution {
}

/// Wait for the execution to be uploaded to the server
pub async fn wait_for_upload(self) -> Result<ExecutionHandle> {
self.uploaded_rx.await.map_err(|_| Error::ChannelClosed)?;
Ok(self.handle)
pub async fn wait_for_upload(mut self) -> Result<ExecutionHandle> {
// Wait for value to change from None to Some
loop {
{
let value = self.uploaded_rx.borrow_and_update();
match &*value {
Some(Ok(handle)) => return Ok(handle.clone()),
Some(Err(error_msg)) => return Err(Error::UploadFailed(error_msg.clone())),
None => {}
}
}
self
.uploaded_rx
.changed()
.await
.map_err(|_| Error::ChannelClosed)?;
}
}

pub fn handle(&self) -> &ExecutionHandle {
Expand All @@ -41,7 +59,7 @@ impl BeginExecution {

/// Handle to an execution that can be used to send observations
#[napi]
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct ExecutionHandle {
pub(crate) execution_id: ExecutionId,
pub(crate) uploader_tx: async_channel::Sender<UploaderMessage>,
Expand Down Expand Up @@ -73,13 +91,20 @@ impl ExecutionHandle {

/// Send an observation (internal use, doesn't wait for upload)
pub(crate) fn send_observation(&self, observation: Observation) -> Result<()> {
// Create a oneshot channel but drop the receiver since we don't wait
let (uploaded_tx, _uploaded_rx) = tokio::sync::oneshot::channel();
// Create a watch channel but drop the receiver since we don't wait
let (uploaded_tx, _uploaded_rx) = tokio::sync::watch::channel::<ObservationUploadResult>(None);

let handle = ObservationHandle {
base_url: self.base_url.clone(),
observation_id: observation.id,
execution_id: self.execution_id,
};

self
.uploader_tx
.try_send(UploaderMessage::Observations {
observations: vec![observation],
handle,
uploaded_tx,
})
.map_err(|_| Error::ChannelClosed)
Expand Down Expand Up @@ -120,7 +145,6 @@ impl ExecutionHandle {
metadata: Option<Vec<Vec<String>>>,
) -> napi::Result<String> {
use observation_tools_shared::models::Observation;
use observation_tools_shared::models::ObservationId;
use observation_tools_shared::models::Payload;
use observation_tools_shared::models::SourceInfo;
use std::collections::HashMap;
Expand Down
1 change: 1 addition & 0 deletions crates/observation-tools-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub use execution::BeginExecution;
pub use execution::ExecutionHandle;
pub use logger::ObservationLogger;
pub use observation::ObservationBuilder;
pub use observation::ObservationBuilderWithPayload;
pub use observation_handle::ObservationHandle;
pub use observation_handle::SendObservation;
// Re-export procedural macro
Expand Down
10 changes: 5 additions & 5 deletions crates/observation-tools-client/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ impl Log for ObservationLogger {
return;
}

let mut builder = ObservationBuilder::new("ObservationLogger")
let mut builder = ObservationBuilder::new("ObservationLogger");
builder
.observation_type(ObservationType::LogEntry)
.log_level(record.level().into())
.label(format!("log/{}", record.target()))
.payload(&format!("{}", record.args()));
.label(format!("log/{}", record.target()));
if let (Some(file), Some(line)) = (record.file(), record.line()) {
builder = builder.source(file, line);
builder.source(file, line);
}
let _ = builder.build();
let _ = builder.payload(&format!("{}", record.args())).build();
}

fn flush(&self) {
Expand Down
Loading