Skip to content

Commit 031579d

Browse files
fix(sdk): normalize how user metadata is encoded
1 parent 00d3888 commit 031579d

6 files changed

Lines changed: 96 additions & 38 deletions

File tree

crates/client/src/lib.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,12 @@ use std::{
7070
};
7171
use temporalio_common::{
7272
HasWorkflowDefinition,
73-
data_converters::{DataConverter, SerializationContextData},
73+
data_converters::{
74+
DataConverter, GenericPayloadConverter, PayloadConverter, SerializationContext,
75+
SerializationContextData,
76+
},
7477
protos::{
75-
coresdk::{AsJsonPayloadExt, IntoPayloadsExt},
78+
coresdk::IntoPayloadsExt,
7679
grpc::health::v1::health_client::HealthClient,
7780
proto_ts_to_system_time,
7881
temporal::api::{
@@ -1033,13 +1036,20 @@ where
10331036

10341037
let user_metadata = if options.static_summary.is_some() || options.static_details.is_some()
10351038
{
1039+
let payload_converter = PayloadConverter::default();
1040+
let context = SerializationContext {
1041+
data: &SerializationContextData::Workflow,
1042+
converter: &payload_converter,
1043+
};
10361044
Some(UserMetadata {
10371045
summary: options.static_summary.map(|s| {
1038-
s.as_json_payload()
1046+
payload_converter
1047+
.to_payload(&context, &s)
10391048
.expect("String-to-JSON payload serialization is infallible")
10401049
}),
10411050
details: options.static_details.map(|s| {
1042-
s.as_json_payload()
1051+
payload_converter
1052+
.to_payload(&context, &s)
10431053
.expect("String-to-JSON payload serialization is infallible")
10441054
}),
10451055
})

crates/client/src/workflow_handle.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ use std::{fmt::Debug, marker::PhantomData};
1212
pub use temporalio_common::UntypedWorkflow;
1313
use temporalio_common::{
1414
HasWorkflowDefinition, QueryDefinition, SignalDefinition, UpdateDefinition, WorkflowDefinition,
15-
data_converters::{DataConverter, PayloadConversionError, RawValue, SerializationContextData},
15+
data_converters::{
16+
DataConverter, GenericPayloadConverter, PayloadConversionError, PayloadConverter, RawValue,
17+
SerializationContext, SerializationContextData,
18+
},
1619
payload_visitor::decode_payloads,
1720
protos::{
1821
coresdk::FromPayloadsExt,
@@ -48,21 +51,25 @@ struct DecodedUserMetadata {
4851
details: Option<String>,
4952
}
5053

51-
async fn decode_user_metadata(
52-
data_converter: &DataConverter,
54+
fn decode_user_metadata(
5355
context: &SerializationContextData,
5456
user_metadata: Option<UserMetadata>,
5557
) -> Result<DecodedUserMetadata, PayloadConversionError> {
58+
let payload_converter = PayloadConverter::default();
59+
let context = SerializationContext {
60+
data: context,
61+
converter: &payload_converter,
62+
};
5663
let (summary, details) = user_metadata
5764
.map(|metadata| (metadata.summary, metadata.details))
5865
.unwrap_or_default();
5966
Ok(DecodedUserMetadata {
6067
summary: match summary {
61-
Some(payload) => Some(data_converter.from_payload(context, payload).await?),
68+
Some(payload) => Some(payload_converter.from_payload(&context, payload)?),
6269
None => None,
6370
},
6471
details: match details {
65-
Some(payload) => Some(data_converter.from_payload(context, payload).await?),
72+
Some(payload) => Some(payload_converter.from_payload(&context, payload)?),
6673
None => None,
6774
},
6875
})
@@ -119,12 +126,8 @@ impl WorkflowExecutionDescription {
119126
&SerializationContextData::Workflow,
120127
)
121128
.await;
122-
let decoded_metadata = decode_user_metadata(
123-
data_converter,
124-
&SerializationContextData::Workflow,
125-
raw_user_metadata,
126-
)
127-
.await?;
129+
let decoded_metadata =
130+
decode_user_metadata(&SerializationContextData::Workflow, raw_user_metadata)?;
128131
let history_length_raw = raw_description
129132
.workflow_execution_info
130133
.as_ref()

crates/sdk-core/tests/integ_tests/data_converter_tests.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,20 @@ async fn describe_decodes_workflow_payload_fields() {
464464
desc.memo().unwrap().fields["tracked"],
465465
"codec-describe".as_json_payload().unwrap()
466466
);
467+
let raw_user_metadata = desc
468+
.raw_description
469+
.execution_config
470+
.as_ref()
471+
.and_then(|cfg| cfg.user_metadata.as_ref())
472+
.expect("describe response should include user metadata");
473+
assert_eq!(
474+
raw_user_metadata.summary,
475+
Some("codec summary".as_json_payload().unwrap())
476+
);
477+
assert_eq!(
478+
raw_user_metadata.details,
479+
Some("codec details".as_json_payload().unwrap())
480+
);
467481
assert_eq!(desc.static_summary(), Some("codec summary"));
468482
assert_eq!(desc.static_details(), Some("codec details"));
469483
}

crates/sdk-core/tests/integ_tests/workflow_tests.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use temporalio_common::{
4444
protos::{
4545
DEFAULT_WORKFLOW_TYPE, canned_histories,
4646
coresdk::{
47-
ActivityTaskCompletion, IntoCompletion,
47+
ActivityTaskCompletion, AsJsonPayloadExt, IntoCompletion,
4848
activity_result::ActivityExecutionResult,
4949
workflow_activation::{WorkflowActivationJob, workflow_activation_job},
5050
workflow_commands::{
@@ -1061,7 +1061,7 @@ async fn pass_timer_summary_to_metadata() {
10611061
let mut mock_cfg = MockPollCfg::from_hist_builder(t);
10621062
let wf_id = mock_cfg.hists[0].wf_id.clone();
10631063
let expected_user_metadata = Some(UserMetadata {
1064-
summary: Some(b"timer summary".into()),
1064+
summary: Some("timer summary".as_json_payload().unwrap()),
10651065
details: None,
10661066
});
10671067
mock_cfg.completion_asserts_from_expectations(|mut asserts| {

crates/sdk/src/workflow_context.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,11 @@ impl BaseWorkflowContext {
421421
let seq = self.inner.seq_nums.borrow_mut().next_timer_seq();
422422
let (cmd, unblocker) =
423423
CancellableWFCommandFut::new(CancellableID::Timer(seq), self.clone());
424+
let payload_converter = PayloadConverter::default();
425+
let context = SerializationContext {
426+
data: &SerializationContextData::Workflow,
427+
converter: &payload_converter,
428+
};
424429
self.send(
425430
CommandCreateRequest {
426431
cmd: WorkflowCommand {
@@ -436,7 +441,11 @@ impl BaseWorkflowContext {
436441
.into(),
437442
),
438443
user_metadata: Some(UserMetadata {
439-
summary: opts.summary.map(|x| x.as_bytes().into()),
444+
summary: opts.summary.map(|summary| {
445+
payload_converter
446+
.to_payload(&context, &summary)
447+
.expect("String-to-JSON payload serialization is infallible")
448+
}),
440449
details: None,
441450
}),
442451
},

crates/sdk/src/workflow_context/options.rs

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,26 @@
11
use std::{collections::HashMap, time::Duration};
22

33
use temporalio_client::Priority;
4-
use temporalio_common::protos::{
5-
coresdk::{
6-
AsJsonPayloadExt,
7-
child_workflow::ChildWorkflowCancellationType,
8-
common::VersioningIntent,
9-
nexus::NexusOperationCancellationType,
10-
workflow_commands::{
11-
ActivityCancellationType, ContinueAsNewWorkflowExecution, ScheduleActivity,
12-
ScheduleLocalActivity, ScheduleNexusOperation, StartChildWorkflowExecution,
13-
WorkflowCommand,
14-
},
4+
use temporalio_common::{
5+
data_converters::{
6+
GenericPayloadConverter, PayloadConverter, SerializationContext, SerializationContextData,
157
},
16-
temporal::api::{
17-
common::v1::{Payload, RetryPolicy, SearchAttributes},
18-
enums::v1::{ParentClosePolicy, WorkflowIdReusePolicy},
19-
sdk::v1::UserMetadata,
8+
protos::{
9+
coresdk::{
10+
child_workflow::ChildWorkflowCancellationType,
11+
common::VersioningIntent,
12+
nexus::NexusOperationCancellationType,
13+
workflow_commands::{
14+
ActivityCancellationType, ContinueAsNewWorkflowExecution, ScheduleActivity,
15+
ScheduleLocalActivity, ScheduleNexusOperation, StartChildWorkflowExecution,
16+
WorkflowCommand,
17+
},
18+
},
19+
temporal::api::{
20+
common::v1::{Payload, RetryPolicy, SearchAttributes},
21+
enums::v1::{ParentClosePolicy, WorkflowIdReusePolicy},
22+
sdk::v1::UserMetadata,
23+
},
2024
},
2125
};
2226
// TODO: Before release, probably best to avoid using proto types entirely here. They're awkward.
@@ -81,6 +85,11 @@ impl ActivityOptions {
8185
arguments: Vec<Payload>,
8286
seq: u32,
8387
) -> WorkflowCommand {
88+
let payload_converter = PayloadConverter::default();
89+
let context = SerializationContext {
90+
data: &SerializationContextData::Workflow,
91+
converter: &payload_converter,
92+
};
8493
WorkflowCommand {
8594
variant: Some(
8695
ScheduleActivity {
@@ -113,7 +122,8 @@ impl ActivityOptions {
113122
user_metadata: self
114123
.summary
115124
.map(|s| {
116-
s.as_json_payload()
125+
payload_converter
126+
.to_payload(&context, &s)
117127
.expect("String-to-JSON payload serialization is infallible")
118128
})
119129
.map(|summary| UserMetadata {
@@ -171,6 +181,11 @@ impl LocalActivityOptions {
171181
arguments: Vec<Payload>,
172182
seq: u32,
173183
) -> WorkflowCommand {
184+
let payload_converter = PayloadConverter::default();
185+
let context = SerializationContext {
186+
data: &SerializationContextData::Workflow,
187+
converter: &payload_converter,
188+
};
174189
// Allow tests to avoid extra verbosity when they don't care about timeouts
175190
// TODO: Builderize LA options
176191
self.schedule_to_close_timeout
@@ -209,8 +224,8 @@ impl LocalActivityOptions {
209224
user_metadata: self
210225
.summary
211226
.map(|summary| {
212-
summary
213-
.as_json_payload()
227+
payload_converter
228+
.to_payload(&context, &summary)
214229
.expect("String-to-JSON payload serialization is infallible")
215230
})
216231
.map(|summary| UserMetadata {
@@ -261,14 +276,21 @@ impl ChildWorkflowOptions {
261276
input: Vec<Payload>,
262277
seq: u32,
263278
) -> WorkflowCommand {
279+
let payload_converter = PayloadConverter::default();
280+
let context = SerializationContext {
281+
data: &SerializationContextData::Workflow,
282+
converter: &payload_converter,
283+
};
264284
let user_metadata = if self.static_summary.is_some() || self.static_details.is_some() {
265285
Some(UserMetadata {
266286
summary: self.static_summary.map(|s| {
267-
s.as_json_payload()
287+
payload_converter
288+
.to_payload(&context, &s)
268289
.expect("String-to-JSON payload serialization is infallible")
269290
}),
270291
details: self.static_details.map(|s| {
271-
s.as_json_payload()
292+
payload_converter
293+
.to_payload(&context, &s)
272294
.expect("String-to-JSON payload serialization is infallible")
273295
}),
274296
})

0 commit comments

Comments
 (0)