Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,5 @@ utoipa = { version = "5", features = ["axum_extras", "chrono", "uuid"] }
utoipa-swagger-ui = { version = "9", features = ["axum"] }
uuid = { version = "1.11", features = ["v4", "v7", "serde", "js"] }
utoipa-axum = { version = "0.2"}
pulldown-cmark = "0.12"
ammonia = "4"
33 changes: 33 additions & 0 deletions crates/observation-tools-client/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,39 @@ export declare class ExecutionHandle {
): string;
}

/** Builder for creating observations */
export declare class ObservationBuilder {
/** Create a new observation builder with the given name */
constructor(name: string);
/** Add a label to the observation */
label(label: string): this;
/** Add metadata to the observation */
metadata(key: string, value: string): this;
/** Set the source info for the observation */
source(file: string, line: number): this;
/** Set the payload as JSON data */
jsonPayload(jsonString: string): this;
/** Set the payload with custom data and MIME type */
rawPayload(data: string, mimeType: string): this;
/** Set the payload as markdown content */
markdownPayload(content: string): this;
/**
* Build and send the observation
*
* Returns a SendObservation which allows you to wait for the upload to complete
* or get the ObservationHandle immediately.
*/
send(execution: ExecutionHandle): SendObservation;
}

export declare class ObservationHandle {
get url(): string;
}

export declare class SendObservation {
handle(): ObservationHandle;
}

/**
* Generate a new execution ID (for testing)
*
Expand Down
3 changes: 3 additions & 0 deletions crates/observation-tools-client/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -764,4 +764,7 @@ module.exports = nativeBinding;
module.exports.Client = nativeBinding.Client;
module.exports.ClientBuilder = nativeBinding.ClientBuilder;
module.exports.ExecutionHandle = nativeBinding.ExecutionHandle;
module.exports.ObservationBuilder = nativeBinding.ObservationBuilder;
module.exports.ObservationHandle = nativeBinding.ObservationHandle;
module.exports.SendObservation = nativeBinding.SendObservation;
module.exports.generateExecutionId = nativeBinding.generateExecutionId;
77 changes: 39 additions & 38 deletions crates/observation-tools-client/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,52 +8,68 @@ use napi_derive::napi;
use observation_tools_shared::models::ExecutionId;
use observation_tools_shared::models::Observation;

/// Result of beginning a new execution
///
/// This type is returned when you start a new execution and allows you to
/// wait for the execution to be uploaded to the server before proceeding.
pub struct BeginExecution {
handle: ExecutionHandle,
uploaded_rx: tokio::sync::oneshot::Receiver<()>,
}

/// Result of sending observation(s)
///
/// This type is returned when you send observations and allows you to
/// wait for the observations to be uploaded to the server before proceeding.
#[napi]
pub struct SendObservation {
pub(crate) handle: ObservationHandle,
pub(crate) uploaded_rx: tokio::sync::oneshot::Receiver<()>,
pub(crate) uploaded_rx: Option<tokio::sync::oneshot::Receiver<()>>,
}

impl SendObservation {
/// Wait for the observations to be uploaded to the server
///
/// This consumes the SendObservation and returns the observation ID
/// after the observations have been successfully uploaded.
///
/// # Returns
/// - `Ok(ObservationId)` if the observations were successfully uploaded
/// - `Err(Error::ChannelClosed)` if the upload task failed
pub(crate) fn new(
handle: ObservationHandle,
uploaded_rx: tokio::sync::oneshot::Receiver<()>,
) -> Self {
Self {
handle,
uploaded_rx: Some(uploaded_rx),
}
}

pub async fn wait_for_upload(self) -> Result<ObservationHandle> {
self.uploaded_rx.await.map_err(|_| Error::ChannelClosed)?;
let rx = self.uploaded_rx.ok_or(Error::ChannelClosed)?;
rx.await.map_err(|_| Error::ChannelClosed)?;
Ok(self.handle)
}

pub fn handle(&self) -> &ObservationHandle {
&self.handle
}

pub fn into_handle(self) -> ObservationHandle {
self.handle
}
}

#[napi]
impl SendObservation {
#[napi(js_name = "handle")]
pub fn handle_napi(&self) -> ObservationHandle {
self.handle.clone()
}
}

#[napi]
#[derive(Clone)]
pub struct ObservationHandle {
pub(crate) base_url: String,
pub(crate) observation_id: observation_tools_shared::models::ObservationId,
pub(crate) execution_id: observation_tools_shared::models::ExecutionId,
}

impl ObservationHandle {
/// Get the observation ID
pub fn id(&self) -> &observation_tools_shared::models::ObservationId {
&self.observation_id
}
}

/// Get the URL to the observation page
#[napi]
impl ObservationHandle {
#[napi(getter)]
pub fn url(&self) -> String {
format!(
"{}/exe/{}/obs/{}",
Expand All @@ -62,9 +78,9 @@ impl ObservationHandle {
}
}

impl Into<ObservationHandle> for SendObservation {
fn into(self) -> ObservationHandle {
self.handle
impl From<SendObservation> for ObservationHandle {
fn from(send: SendObservation) -> ObservationHandle {
send.into_handle()
}
}

Expand All @@ -80,30 +96,15 @@ impl BeginExecution {
}

/// Wait for the execution to be uploaded to the server
///
/// This consumes the BeginExecution and returns the ExecutionHandle
/// after the execution has been successfully uploaded.
///
/// # Returns
/// - `Ok(ExecutionHandle)` if the execution was successfully uploaded
/// - `Err(Error::ChannelClosed)` if the upload task failed
pub async fn wait_for_upload(self) -> Result<ExecutionHandle> {
self.uploaded_rx.await.map_err(|_| Error::ChannelClosed)?;
Ok(self.handle)
}

/// Get a reference to the execution handle without waiting for upload
///
/// This is useful if you want to start sending observations immediately
/// without waiting for the execution creation to complete.
pub fn handle(&self) -> &ExecutionHandle {
&self.handle
}

/// Consume this and return the execution handle without waiting for upload
///
/// This is useful if you don't care about waiting for the execution
/// to be uploaded before proceeding.
pub fn into_handle(self) -> ExecutionHandle {
self.handle
}
Expand Down
132 changes: 129 additions & 3 deletions crates/observation-tools-client/src/observation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
use crate::client::UploaderMessage;
use crate::context;
use crate::error::Result;
use crate::execution::ExecutionHandle;
use crate::execution::ObservationHandle;
use crate::execution::SendObservation;
use crate::Error;
use napi_derive::napi;
use observation_tools_shared::models::IntoPayload;
use observation_tools_shared::models::Observation;
use observation_tools_shared::models::ObservationId;
Expand All @@ -17,6 +19,7 @@ use observation_tools_shared::ObservationType;
use std::collections::HashMap;

/// Builder for creating observations
#[napi]
pub struct ObservationBuilder {
name: String,
labels: Vec<String>,
Expand Down Expand Up @@ -106,6 +109,11 @@ impl ObservationBuilder {
/// immediately.
pub fn build(self) -> Result<SendObservation> {
let execution = context::get_current_execution().ok_or(Error::NoExecutionContext)?;
self.build_with_execution(&execution)
}

/// Build and send the observation using an explicit execution handle
pub fn build_with_execution(self, execution: &ExecutionHandle) -> Result<SendObservation> {
let observation_id = ObservationId::new();
let observation = Observation {
id: observation_id,
Expand Down Expand Up @@ -135,13 +143,131 @@ impl ObservationBuilder {
uploaded_tx,
})
.map_err(|_| Error::ChannelClosed)?;
Ok(SendObservation {
handle: ObservationHandle {
Ok(SendObservation::new(
ObservationHandle {
base_url: execution.base_url().to_string(),
execution_id: execution.id(),
observation_id,
},
uploaded_rx,
))
}
}

#[napi]
impl ObservationBuilder {
/// Create a new observation builder with the given name
#[napi(constructor)]
pub fn new_napi(name: String) -> Self {
Self {
name,
labels: Vec::new(),
metadata: HashMap::new(),
source: None,
parent_span_id: None,
payload: None,
observation_type: ObservationType::Payload,
log_level: LogLevel::Info,
}
}

/// Add a label to the observation
#[napi(js_name = "label")]
pub fn label_napi(&mut self, label: String) -> &Self {
self.labels.push(label);
self
}

/// Add metadata to the observation
#[napi(js_name = "metadata")]
pub fn metadata_napi(&mut self, key: String, value: String) -> &Self {
self.metadata.insert(key, value);
self
}

/// Set the source info for the observation
#[napi(js_name = "source")]
pub fn source_napi(&mut self, file: String, line: u32) -> &Self {
self.source = Some(SourceInfo {
file,
line,
column: None,
});
self
}

/// Set the payload as JSON data
#[napi(js_name = "jsonPayload")]
pub fn json_payload_napi(&mut self, json_string: String) -> napi::Result<&Self> {
serde_json::from_str::<serde_json::Value>(&json_string)
.map_err(|e| napi::Error::from_reason(format!("Invalid JSON payload: {}", e)))?;

self.payload = Some(Payload::json(json_string));
Ok(self)
}

/// Set the payload with custom data and MIME type
#[napi(js_name = "rawPayload")]
pub fn raw_payload_napi(&mut self, data: String, mime_type: String) -> &Self {
self.payload = Some(Payload::with_mime_type(data, mime_type));
self
}

/// Set the payload as markdown content
#[napi(js_name = "markdownPayload")]
pub fn markdown_payload_napi(&mut self, content: String) -> &Self {
self.payload = Some(Payload::with_mime_type(content, "text/markdown"));
self
}

/// Build and send the observation
///
/// Returns a SendObservation which allows you to wait for the upload to
/// complete or get the ObservationHandle immediately.
#[napi]
pub fn send(&mut self, execution: &ExecutionHandle) -> napi::Result<SendObservation> {
let observation_id = ObservationId::new();
let observation = Observation {
id: observation_id,
execution_id: execution.id(),
name: self.name.clone(),
observation_type: self.observation_type,
log_level: self.log_level,
labels: std::mem::take(&mut self.labels),
metadata: std::mem::take(&mut self.metadata),
source: self.source.take(),
parent_span_id: self.parent_span_id.take(),
payload: self
.payload
.take()
.ok_or_else(|| napi::Error::from_reason("Payload is required"))?,
created_at: chrono::Utc::now(),
};

let (uploaded_tx, uploaded_rx) = tokio::sync::oneshot::channel();

log::info!(
"Sending: {}/exe/{}/obs/{}",
execution.base_url(),
execution.id(),
observation_id
);

execution
.uploader_tx
.try_send(crate::client::UploaderMessage::Observations {
observations: vec![observation],
uploaded_tx,
})
.map_err(|_| napi::Error::from_reason("Channel closed"))?;

Ok(SendObservation::new(
ObservationHandle {
base_url: execution.base_url().to_string(),
execution_id: execution.id(),
observation_id,
},
uploaded_rx,
})
))
}
}
2 changes: 2 additions & 0 deletions crates/observation-tools-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ utoipa.workspace = true
utoipa-axum.workspace = true
utoipa-swagger-ui.workspace = true
uuid.workspace = true
pulldown-cmark.workspace = true
ammonia.workspace = true

[build-dependencies]
minijinja-embed.workspace = true
Expand Down
Loading