Skip to content

Commit 53b10fb

Browse files
feat(client): ergonomic workflow description
1 parent c78bfb6 commit 53b10fb

3 files changed

Lines changed: 396 additions & 34 deletions

File tree

crates/client/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ tracing = "0.1"
4242
url = "2.5"
4343
uuid = { version = "1.18", features = ["v4"] }
4444
rand = "0.10"
45-
serde_json = { workspace = true }
4645

4746
[dependencies.temporalio-common]
4847
path = "../common"

crates/client/src/workflow_handle.rs

Lines changed: 306 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@ use std::{fmt::Debug, marker::PhantomData};
1212
pub use temporalio_common::UntypedWorkflow;
1313
use temporalio_common::{
1414
HasWorkflowDefinition, QueryDefinition, SignalDefinition, UpdateDefinition, WorkflowDefinition,
15-
data_converters::{RawValue, SerializationContextData},
15+
data_converters::{DataConverter, PayloadConversionError, RawValue, SerializationContextData},
16+
payload_visitor::decode_payloads,
1617
protos::{
1718
coresdk::FromPayloadsExt,
19+
proto_ts_to_system_time,
1820
temporal::api::{
1921
common::v1::{Payload, Payloads, WorkflowExecution as ProtoWorkflowExecution},
2022
enums::v1::{HistoryEventFilterType, UpdateWorkflowExecutionLifecycleStage},
@@ -24,7 +26,9 @@ use temporalio_common::{
2426
v1::{HistoryEvent, history_event::Attributes},
2527
},
2628
query::v1::WorkflowQuery,
29+
sdk::v1::UserMetadata,
2730
update::{self, v1::WaitPolicy},
31+
workflow::v1 as workflow,
2832
workflowservice::v1::{
2933
DescribeWorkflowExecutionRequest, DescribeWorkflowExecutionResponse,
3034
GetWorkflowExecutionHistoryRequest, PollWorkflowExecutionUpdateRequest,
@@ -38,6 +42,32 @@ use temporalio_common::{
3842
use tonic::IntoRequest;
3943
use uuid::Uuid;
4044

45+
#[derive(Debug, Clone, Default, PartialEq, Eq)]
46+
struct DecodedUserMetadata {
47+
summary: Option<String>,
48+
details: Option<String>,
49+
}
50+
51+
async fn decode_user_metadata(
52+
data_converter: &DataConverter,
53+
context: &SerializationContextData,
54+
user_metadata: Option<UserMetadata>,
55+
) -> Result<DecodedUserMetadata, PayloadConversionError> {
56+
let (summary, details) = user_metadata
57+
.map(|metadata| (metadata.summary, metadata.details))
58+
.unwrap_or_default();
59+
Ok(DecodedUserMetadata {
60+
summary: match summary {
61+
Some(payload) => Some(data_converter.from_payload(context, payload).await?),
62+
None => None,
63+
},
64+
details: match details {
65+
Some(payload) => Some(data_converter.from_payload(context, payload).await?),
66+
None => None,
67+
},
68+
})
69+
}
70+
4171
/// Enumerates terminal states for a particular workflow execution
4272
#[derive(Debug)]
4373
#[allow(clippy::large_enum_variant)]
@@ -63,43 +93,173 @@ pub enum WorkflowExecutionResult<T> {
6393
}
6494

6595
/// Description of a workflow execution returned by `WorkflowHandle::describe`.
96+
///
97+
/// Access to the underlying Protobuf message is provided by [`raw`](Self::raw).
6698
#[derive(Debug, Clone)]
6799
pub struct WorkflowExecutionDescription {
68-
/// The raw proto response from the server.
100+
/// The raw proto response from the server after payloads have been decoded
101+
/// through the client's codec.
69102
pub raw_description: DescribeWorkflowExecutionResponse,
103+
history_length: usize,
104+
static_summary: Option<String>,
105+
static_details: Option<String>,
70106
}
71107

72108
impl WorkflowExecutionDescription {
73-
fn new(raw_description: DescribeWorkflowExecutionResponse) -> Self {
74-
Self { raw_description }
109+
async fn new(
110+
mut raw_description: DescribeWorkflowExecutionResponse,
111+
data_converter: &DataConverter,
112+
) -> Result<Self, PayloadConversionError> {
113+
let raw_user_metadata = raw_description
114+
.execution_config
115+
.as_ref()
116+
.and_then(|cfg| cfg.user_metadata.clone());
117+
decode_payloads(
118+
&mut raw_description,
119+
data_converter.codec(),
120+
&SerializationContextData::Workflow,
121+
)
122+
.await;
123+
let decoded_metadata = decode_user_metadata(
124+
data_converter,
125+
&SerializationContextData::Workflow,
126+
raw_user_metadata,
127+
)
128+
.await?;
129+
let history_length_raw = raw_description
130+
.workflow_execution_info
131+
.as_ref()
132+
.map(|info| info.history_length)
133+
.unwrap_or(0);
134+
let history_length = history_length_raw.try_into().map_err(|_| {
135+
PayloadConversionError::EncodingError(
136+
format!("workflow history_length must be non-negative, got {history_length_raw}")
137+
.into(),
138+
)
139+
})?;
140+
Ok(Self {
141+
raw_description,
142+
history_length,
143+
static_summary: decoded_metadata.summary,
144+
static_details: decoded_metadata.details,
145+
})
75146
}
76147

77-
/// The static summary set when the workflow was started, if any.
78-
// TOOD: Use DataConverter to avoid direct dependency on serde_json
79-
pub fn static_summary(&self) -> Option<String> {
80-
let payload = self
81-
.raw_description
82-
.execution_config
83-
.as_ref()?
84-
.user_metadata
85-
.as_ref()?
86-
.summary
87-
.as_ref()?;
88-
serde_json::from_slice(&payload.data).ok()
89-
}
90-
91-
/// The static details set when the workflow was started, if any.
92-
// TOOD: Use DataConverter to avoid direct dependency on serde_json
93-
pub fn static_details(&self) -> Option<String> {
94-
let payload = self
95-
.raw_description
96-
.execution_config
97-
.as_ref()?
98-
.user_metadata
99-
.as_ref()?
100-
.details
101-
.as_ref()?;
102-
serde_json::from_slice(&payload.data).ok()
148+
/// The workflow ID.
149+
pub fn id(&self) -> &str {
150+
self.workflow_info()
151+
.and_then(|info| info.execution.as_ref())
152+
.map(|e| e.workflow_id.as_str())
153+
.unwrap_or("")
154+
}
155+
156+
/// The run ID.
157+
pub fn run_id(&self) -> &str {
158+
self.workflow_info()
159+
.and_then(|info| info.execution.as_ref())
160+
.map(|e| e.run_id.as_str())
161+
.unwrap_or("")
162+
}
163+
164+
/// The workflow type name.
165+
pub fn workflow_type(&self) -> &str {
166+
self.workflow_info()
167+
.and_then(|info| info.r#type.as_ref())
168+
.map(|t| t.name.as_str())
169+
.unwrap_or("")
170+
}
171+
172+
/// The current status of the workflow execution.
173+
pub fn status(
174+
&self,
175+
) -> temporalio_common::protos::temporal::api::enums::v1::WorkflowExecutionStatus {
176+
self.workflow_info()
177+
.map(workflow::WorkflowExecutionInfo::status)
178+
.unwrap_or_default()
179+
}
180+
181+
/// When the workflow was created.
182+
pub fn start_time(&self) -> Option<std::time::SystemTime> {
183+
self.workflow_info()
184+
.and_then(|info| info.start_time.as_ref())
185+
.and_then(proto_ts_to_system_time)
186+
}
187+
188+
/// When the workflow run started or should start.
189+
pub fn execution_time(&self) -> Option<std::time::SystemTime> {
190+
self.workflow_info()
191+
.and_then(|info| info.execution_time.as_ref())
192+
.and_then(proto_ts_to_system_time)
193+
}
194+
195+
/// When the workflow was closed, if closed.
196+
pub fn close_time(&self) -> Option<std::time::SystemTime> {
197+
self.workflow_info()
198+
.and_then(|info| info.close_time.as_ref())
199+
.and_then(proto_ts_to_system_time)
200+
}
201+
202+
/// The task queue the workflow runs on.
203+
pub fn task_queue(&self) -> &str {
204+
self.workflow_info()
205+
.map(|info| info.task_queue.as_str())
206+
.unwrap_or("")
207+
}
208+
209+
/// Number of events in history.
210+
pub fn history_length(&self) -> usize {
211+
self.history_length
212+
}
213+
214+
/// Workflow memo after codec decoding.
215+
pub fn memo(&self) -> Option<&temporalio_common::protos::temporal::api::common::v1::Memo> {
216+
self.workflow_info().and_then(|info| info.memo.as_ref())
217+
}
218+
219+
/// Parent workflow ID, if this is a child workflow.
220+
pub fn parent_id(&self) -> Option<&str> {
221+
self.workflow_info()
222+
.and_then(|info| info.parent_execution.as_ref())
223+
.map(|e| e.workflow_id.as_str())
224+
}
225+
226+
/// Parent run ID, if this is a child workflow.
227+
pub fn parent_run_id(&self) -> Option<&str> {
228+
self.workflow_info()
229+
.and_then(|info| info.parent_execution.as_ref())
230+
.map(|e| e.run_id.as_str())
231+
}
232+
233+
/// Search attributes on the workflow.
234+
pub fn search_attributes(
235+
&self,
236+
) -> Option<&temporalio_common::protos::temporal::api::common::v1::SearchAttributes> {
237+
self.workflow_info()
238+
.and_then(|info| info.search_attributes.as_ref())
239+
}
240+
241+
/// Static summary configured on the workflow, if present.
242+
pub fn static_summary(&self) -> Option<&str> {
243+
self.static_summary.as_deref()
244+
}
245+
246+
/// Static details configured on the workflow, if present.
247+
pub fn static_details(&self) -> Option<&str> {
248+
self.static_details.as_deref()
249+
}
250+
251+
/// Access the raw proto for additional fields not exposed via accessors.
252+
pub fn raw(&self) -> &DescribeWorkflowExecutionResponse {
253+
&self.raw_description
254+
}
255+
256+
/// Consume the wrapper and return the raw proto.
257+
pub fn into_raw(self) -> DescribeWorkflowExecutionResponse {
258+
self.raw_description
259+
}
260+
261+
fn workflow_info(&self) -> Option<&workflow::WorkflowExecutionInfo> {
262+
self.raw_description.workflow_execution_info.as_ref()
103263
}
104264
}
105265

@@ -689,7 +849,9 @@ where
689849
.await
690850
.map_err(WorkflowInteractionError::from_status)?
691851
.into_inner();
692-
Ok(WorkflowExecutionDescription::new(response))
852+
WorkflowExecutionDescription::new(response, self.client.data_converter())
853+
.await
854+
.map_err(WorkflowInteractionError::from)
693855
}
694856
/// Fetch workflow execution history.
695857
pub async fn fetch_history(
@@ -841,3 +1003,116 @@ where
8411003
}
8421004
}
8431005
}
1006+
1007+
#[cfg(test)]
1008+
mod tests {
1009+
use super::*;
1010+
use std::collections::HashMap;
1011+
use temporalio_common::protos::temporal::api::{
1012+
common::v1::{Memo, SearchAttributes},
1013+
enums::v1::WorkflowExecutionStatus,
1014+
sdk::v1::UserMetadata,
1015+
workflow::v1::WorkflowExecutionConfig,
1016+
};
1017+
1018+
#[tokio::test]
1019+
async fn workflow_description_accessors_expose_decoded_fields() {
1020+
let converter = DataConverter::default();
1021+
let memo_payload = converter
1022+
.to_payload(&SerializationContextData::Workflow, &"memo-value")
1023+
.await
1024+
.unwrap();
1025+
let search_attr_payload = converter
1026+
.to_payload(&SerializationContextData::Workflow, &"search-value")
1027+
.await
1028+
.unwrap();
1029+
let summary_payload = converter
1030+
.to_payload(&SerializationContextData::Workflow, &"workflow summary")
1031+
.await
1032+
.unwrap();
1033+
let details_payload = converter
1034+
.to_payload(&SerializationContextData::Workflow, &"workflow details")
1035+
.await
1036+
.unwrap();
1037+
let description = WorkflowExecutionDescription::new(
1038+
DescribeWorkflowExecutionResponse {
1039+
workflow_execution_info: Some(workflow::WorkflowExecutionInfo {
1040+
execution: Some(ProtoWorkflowExecution {
1041+
workflow_id: "wf-id".to_string(),
1042+
run_id: "run-id".to_string(),
1043+
}),
1044+
r#type: Some(
1045+
temporalio_common::protos::temporal::api::common::v1::WorkflowType {
1046+
name: "wf-type".to_string(),
1047+
},
1048+
),
1049+
status: WorkflowExecutionStatus::Completed as i32,
1050+
task_queue: "task-queue".to_string(),
1051+
history_length: 42,
1052+
memo: Some(Memo {
1053+
fields: HashMap::from([("memo-key".to_string(), memo_payload.clone())]),
1054+
}),
1055+
parent_execution: Some(ProtoWorkflowExecution {
1056+
workflow_id: "parent-id".to_string(),
1057+
run_id: "parent-run-id".to_string(),
1058+
}),
1059+
search_attributes: Some(SearchAttributes {
1060+
indexed_fields: HashMap::from([(
1061+
"CustomKeywordField".to_string(),
1062+
search_attr_payload.clone(),
1063+
)]),
1064+
}),
1065+
..Default::default()
1066+
}),
1067+
execution_config: Some(WorkflowExecutionConfig {
1068+
user_metadata: Some(UserMetadata {
1069+
summary: Some(summary_payload),
1070+
details: Some(details_payload),
1071+
}),
1072+
..Default::default()
1073+
}),
1074+
..Default::default()
1075+
},
1076+
&converter,
1077+
)
1078+
.await
1079+
.unwrap();
1080+
1081+
assert_eq!(description.id(), "wf-id");
1082+
assert_eq!(description.run_id(), "run-id");
1083+
assert_eq!(description.workflow_type(), "wf-type");
1084+
assert_eq!(description.status(), WorkflowExecutionStatus::Completed);
1085+
assert_eq!(description.task_queue(), "task-queue");
1086+
assert_eq!(description.history_length(), 42);
1087+
assert_eq!(description.parent_id(), Some("parent-id"));
1088+
assert_eq!(description.parent_run_id(), Some("parent-run-id"));
1089+
assert_eq!(description.memo().unwrap().fields["memo-key"], memo_payload);
1090+
assert_eq!(
1091+
description.search_attributes().unwrap().indexed_fields["CustomKeywordField"],
1092+
search_attr_payload
1093+
);
1094+
assert_eq!(description.static_summary(), Some("workflow summary"));
1095+
assert_eq!(description.static_details(), Some("workflow details"));
1096+
}
1097+
1098+
#[tokio::test]
1099+
async fn workflow_description_rejects_negative_history_length() {
1100+
let err = WorkflowExecutionDescription::new(
1101+
DescribeWorkflowExecutionResponse {
1102+
workflow_execution_info: Some(workflow::WorkflowExecutionInfo {
1103+
history_length: -1,
1104+
..Default::default()
1105+
}),
1106+
..Default::default()
1107+
},
1108+
&DataConverter::default(),
1109+
)
1110+
.await
1111+
.unwrap_err();
1112+
1113+
assert_eq!(
1114+
err.to_string(),
1115+
"Encoding error: workflow history_length must be non-negative, got -1"
1116+
);
1117+
}
1118+
}

0 commit comments

Comments
 (0)