Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 86 additions & 8 deletions crates/client/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
//! Contains errors that can be returned by clients.

use http::uri::InvalidUri;
use std::{error::Error, fmt};
use temporalio_common::{
data_converters::PayloadConversionError,
protos::temporal::api::{common::v1::Payload, failure::v1::Failure, query::v1::QueryRejected},
data_converters::{PayloadConversionError, TemporalError},
protos::temporal::api::{common::v1::Payload, query::v1::QueryRejected},
};
use tonic::Code;

Expand Down Expand Up @@ -123,8 +124,8 @@ pub enum WorkflowUpdateError {
NotFound(#[source] tonic::Status),

/// The update failed with an application-level failure.
#[error("Update failed: {0:?}")]
Failed(Box<Failure>),
#[error("Update failed: {}", TemporalErrorChain(.0))]
Failed(#[source] TemporalError),

/// Error serializing input or deserializing output.
#[error("Payload conversion error: {0}")]
Expand Down Expand Up @@ -154,8 +155,8 @@ impl WorkflowUpdateError {
#[non_exhaustive]
pub enum WorkflowGetResultError {
/// The workflow finished in failure.
#[error("Workflow failed: {0:?}")]
Failed(Box<Failure>),
#[error("Workflow failed: {}", TemporalErrorChain(.0))]
Failed(#[source] TemporalError),

/// The workflow was cancelled.
#[error("Workflow cancelled")]
Expand All @@ -173,7 +174,7 @@ pub enum WorkflowGetResultError {

/// The workflow timed out.
#[error("Workflow timed out")]
TimedOut,
Timeout,

/// The workflow continued as new.
#[error("Workflow continued as new")]
Expand Down Expand Up @@ -217,12 +218,26 @@ impl WorkflowGetResultError {
Self::Failed(_)
| Self::Cancelled { .. }
| Self::Terminated { .. }
| Self::TimedOut
| Self::Timeout
| Self::ContinuedAsNew
)
}
}

struct TemporalErrorChain<'a>(&'a TemporalError);

impl fmt::Display for TemporalErrorChain<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)?;
let mut source = self.0.source();
while let Some(err) = source {
write!(f, ": {err}")?;
source = err.source();
}
Ok(())
}
}

/// Errors returned by client methods that don't need more specific error types.
#[derive(thiserror::Error, Debug)]
#[non_exhaustive]
Expand Down Expand Up @@ -292,3 +307,66 @@ impl AsyncActivityError {
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum ClientNewError {}

#[cfg(test)]
mod tests {
use super::{WorkflowGetResultError, WorkflowUpdateError};
use temporalio_common::{
data_converters::TemporalError, protos::temporal::api::enums::v1::RetryState,
};

#[test]
fn workflow_get_result_error_includes_nested_activity_cause_message() {
let error = WorkflowGetResultError::Failed(TemporalError::Activity {
message: "Activity task failed".into(),
stack_trace: String::new(),
scheduled_event_id: 1,
started_event_id: 2,
identity: "worker".into(),
activity_type: "test-activity".into(),
activity_id: "activity-id".into(),
retry_state: RetryState::NonRetryableFailure,
cause: Some(Box::new(TemporalError::Application {
message: "boom".into(),
stack_trace: String::new(),
r#type: "TestError".into(),
non_retryable: false,
details: None,
next_retry_delay: None,
cause: None,
})),
});

let rendered = error.to_string();
assert!(rendered.contains("Workflow failed: Activity task failed"));
assert!(rendered.contains("boom"));
}

#[test]
fn workflow_update_error_includes_nested_child_workflow_cause_message() {
let error = WorkflowUpdateError::Failed(TemporalError::ChildWorkflow {
message: "Child workflow task failed".into(),
stack_trace: String::new(),
namespace: "default".into(),
workflow_id: "child-id".into(),
run_id: "child-run".into(),
workflow_type: "child-type".into(),
initiated_event_id: 3,
started_event_id: 4,
retry_state: RetryState::InProgress,
cause: Some(Box::new(TemporalError::Application {
message: "child boom".into(),
stack_trace: String::new(),
r#type: "ChildError".into(),
non_retryable: false,
details: None,
next_retry_delay: None,
cause: None,
})),
});

let rendered = error.to_string();
assert!(rendered.contains("Update failed: Child workflow task failed"));
assert!(rendered.contains("child boom"));
}
}
6 changes: 3 additions & 3 deletions crates/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,16 @@ pub use async_activity_handle::{
ActivityHeartbeatResponse, ActivityIdentifier, AsyncActivityHandle,
};

pub use errors::*;
pub use metrics::{LONG_REQUEST_LATENCY_HISTOGRAM_NAME, REQUEST_LATENCY_HISTOGRAM_NAME};
pub use options_structs::*;
pub use replaceable::SharedReplaceableClient;
pub use retry::RetryOptions;
pub use tonic;
pub use workflow_handle::{
UntypedQuery, UntypedSignal, UntypedUpdate, UntypedWorkflow, UntypedWorkflowHandle,
WorkflowExecutionDescription, WorkflowExecutionInfo, WorkflowExecutionResult, WorkflowHandle,
WorkflowHistory, WorkflowUpdateHandle,
WorkflowExecutionDescription, WorkflowExecutionInfo, WorkflowHandle, WorkflowHistory,
WorkflowUpdateHandle,
};

use crate::{
Expand All @@ -55,7 +56,6 @@ use crate::{
request_extensions::RequestExt,
worker::ClientWorkerSet,
};
use errors::*;
use futures_util::{stream, stream::Stream};
use http::Uri;
use parking_lot::RwLock;
Expand Down
20 changes: 16 additions & 4 deletions crates/client/src/workflow_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use uuid::Uuid;
/// Enumerates terminal states for a particular workflow execution
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum WorkflowExecutionResult<T> {
pub(crate) enum WorkflowExecutionResult<T> {
/// The workflow finished successfully
Succeeded(T),
/// The workflow finished in failure
Expand Down Expand Up @@ -267,14 +267,21 @@ where
let raw = self.get_result_raw(opts).await?;
match raw {
WorkflowExecutionResult::Succeeded(v) => Ok(v),
WorkflowExecutionResult::Failed(f) => Err(WorkflowGetResultError::Failed(Box::new(f))),
WorkflowExecutionResult::Failed(f) => {
let err = self
.client
.data_converter()
.decode_failure(f, &SerializationContextData::Workflow)
.await;
Err(WorkflowGetResultError::Failed(err))
}
WorkflowExecutionResult::Cancelled { details } => {
Err(WorkflowGetResultError::Cancelled { details })
}
WorkflowExecutionResult::Terminated { details } => {
Err(WorkflowGetResultError::Terminated { details })
}
WorkflowExecutionResult::TimedOut => Err(WorkflowGetResultError::TimedOut),
WorkflowExecutionResult::TimedOut => Err(WorkflowGetResultError::Timeout),
WorkflowExecutionResult::ContinuedAsNew => Err(WorkflowGetResultError::ContinuedAsNew),
}
}
Expand Down Expand Up @@ -805,7 +812,12 @@ where
.await
.map_err(WorkflowUpdateError::from),
Some(update::v1::outcome::Value::Failure(failure)) => {
Err(WorkflowUpdateError::Failed(Box::new(failure)))
let err = self
.client
.data_converter()
.decode_failure(failure, &SerializationContextData::Workflow)
.await;
Err(WorkflowUpdateError::Failed(err))
}
None => Err(WorkflowUpdateError::Other(
"Update returned no outcome value".into(),
Expand Down
2 changes: 2 additions & 0 deletions crates/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ pbjson-build = { workspace = true }
workspace = true

[dev-dependencies]
assert_matches = "1.5"
futures-util = { version = "0.3", default-features = false }
rstest = "0.26"
tempfile = "3.21"
tokio = { version = "1.47", features = ["macros", "rt"] }
Loading
Loading