Skip to content

Commit 33b8680

Browse files
fix(sdk): normalize how user metadata is encoded (#1216)
* fix(sdk): normalize how user metadata is encoded * chore: add test asserting codec isnt used decoding user metadata
1 parent d7ebff8 commit 33b8680

6 files changed

Lines changed: 170 additions & 40 deletions

File tree

crates/client/src/lib.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,12 @@ use std::{
7171
};
7272
use temporalio_common::{
7373
HasWorkflowDefinition,
74-
data_converters::{DataConverter, SerializationContextData},
74+
data_converters::{
75+
DataConverter, GenericPayloadConverter, PayloadConverter, SerializationContext,
76+
SerializationContextData,
77+
},
7578
protos::{
76-
coresdk::{AsJsonPayloadExt, IntoPayloadsExt},
79+
coresdk::IntoPayloadsExt,
7780
grpc::health::v1::health_client::HealthClient,
7881
proto_ts_to_system_time,
7982
temporal::api::{
@@ -1063,13 +1066,20 @@ where
10631066

10641067
let user_metadata = if options.static_summary.is_some() || options.static_details.is_some()
10651068
{
1069+
let payload_converter = PayloadConverter::default();
1070+
let context = SerializationContext {
1071+
data: &SerializationContextData::Workflow,
1072+
converter: &payload_converter,
1073+
};
10661074
Some(UserMetadata {
10671075
summary: options.static_summary.map(|s| {
1068-
s.as_json_payload()
1076+
payload_converter
1077+
.to_payload(&context, &s)
10691078
.expect("String-to-JSON payload serialization is infallible")
10701079
}),
10711080
details: options.static_details.map(|s| {
1072-
s.as_json_payload()
1081+
payload_converter
1082+
.to_payload(&context, &s)
10731083
.expect("String-to-JSON payload serialization is infallible")
10741084
}),
10751085
})

crates/client/src/workflow_handle.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ use std::{fmt::Debug, marker::PhantomData};
1212
pub use temporalio_common::UntypedWorkflow;
1313
use temporalio_common::{
1414
HasWorkflowDefinition, QueryDefinition, SignalDefinition, UpdateDefinition, WorkflowDefinition,
15-
data_converters::{DataConverter, PayloadConversionError, RawValue, SerializationContextData},
15+
data_converters::{
16+
DataConverter, GenericPayloadConverter, PayloadConversionError, PayloadConverter, RawValue,
17+
SerializationContext, SerializationContextData,
18+
},
1619
payload_visitor::decode_payloads,
1720
protos::{
1821
coresdk::FromPayloadsExt,
@@ -48,21 +51,25 @@ struct DecodedUserMetadata {
4851
details: Option<String>,
4952
}
5053

51-
async fn decode_user_metadata(
52-
data_converter: &DataConverter,
54+
fn decode_user_metadata(
5355
context: &SerializationContextData,
5456
user_metadata: Option<UserMetadata>,
5557
) -> Result<DecodedUserMetadata, PayloadConversionError> {
58+
let payload_converter = PayloadConverter::default();
59+
let context = SerializationContext {
60+
data: context,
61+
converter: &payload_converter,
62+
};
5663
let (summary, details) = user_metadata
5764
.map(|metadata| (metadata.summary, metadata.details))
5865
.unwrap_or_default();
5966
Ok(DecodedUserMetadata {
6067
summary: match summary {
61-
Some(payload) => Some(data_converter.from_payload(context, payload).await?),
68+
Some(payload) => Some(payload_converter.from_payload(&context, payload)?),
6269
None => None,
6370
},
6471
details: match details {
65-
Some(payload) => Some(data_converter.from_payload(context, payload).await?),
72+
Some(payload) => Some(payload_converter.from_payload(&context, payload)?),
6673
None => None,
6774
},
6875
})
@@ -119,12 +126,8 @@ impl WorkflowExecutionDescription {
119126
&SerializationContextData::Workflow,
120127
)
121128
.await;
122-
let decoded_metadata = decode_user_metadata(
123-
data_converter,
124-
&SerializationContextData::Workflow,
125-
raw_user_metadata,
126-
)
127-
.await?;
129+
let decoded_metadata =
130+
decode_user_metadata(&SerializationContextData::Workflow, raw_user_metadata)?;
128131
let history_length_raw = raw_description
129132
.workflow_execution_info
130133
.as_ref()

crates/sdk-core/tests/integ_tests/data_converter_tests.rs

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -284,19 +284,30 @@ async fn multi_args_serializes_as_multiple_payloads() {
284284
/// A codec that XORs payload data with a key and tracks encode/decode operations.
285285
struct XorCodec {
286286
key: u8,
287+
gate_on_metadata: bool,
287288
encode_count: AtomicUsize,
288289
decode_count: AtomicUsize,
289290
}
290291

291292
impl XorCodec {
292293
fn new(key: u8) -> Self {
293294
Self {
295+
gate_on_metadata: true,
294296
key,
295297
encode_count: AtomicUsize::new(0),
296298
decode_count: AtomicUsize::new(0),
297299
}
298300
}
299301

302+
fn new_with_metadata_gate(key: u8, gate_on_metadata: bool) -> Self {
303+
Self {
304+
key,
305+
gate_on_metadata,
306+
encode_count: AtomicUsize::new(0),
307+
decode_count: AtomicUsize::new(0),
308+
}
309+
}
310+
300311
fn encode_count(&self) -> usize {
301312
self.encode_count.load(Ordering::SeqCst)
302313
}
@@ -316,12 +327,15 @@ impl PayloadCodec for XorCodec {
316327
eprintln!("XorCodec::encode called with {} payloads", count);
317328
self.encode_count.fetch_add(count, Ordering::SeqCst);
318329
let key = self.key;
330+
let gate_on_metadata = self.gate_on_metadata;
319331
async move {
320332
payloads
321333
.into_iter()
322334
.map(|mut p| {
323335
p.data = p.data.iter().map(|b| b ^ key).collect();
324-
p.metadata.insert("xor_encoded".to_string(), vec![key]);
336+
if gate_on_metadata {
337+
p.metadata.insert("xor_encoded".to_string(), vec![key]);
338+
}
325339
p
326340
})
327341
.collect()
@@ -338,11 +352,12 @@ impl PayloadCodec for XorCodec {
338352
eprintln!("XorCodec::decode called with {} payloads", count);
339353
self.decode_count.fetch_add(count, Ordering::SeqCst);
340354
let key = self.key;
355+
let gate_on_metadata = self.gate_on_metadata;
341356
async move {
342357
payloads
343358
.into_iter()
344359
.map(|mut p| {
345-
if p.metadata.remove("xor_encoded").is_some() {
360+
if !gate_on_metadata || p.metadata.remove("xor_encoded").is_some() {
346361
p.data = p.data.iter().map(|b| b ^ key).collect();
347362
}
348363
p
@@ -464,6 +479,77 @@ async fn describe_decodes_workflow_payload_fields() {
464479
desc.memo().unwrap().fields["tracked"],
465480
"codec-describe".as_json_payload().unwrap()
466481
);
482+
let raw_user_metadata = desc
483+
.raw_description
484+
.execution_config
485+
.as_ref()
486+
.and_then(|cfg| cfg.user_metadata.as_ref())
487+
.expect("describe response should include user metadata");
488+
assert_eq!(
489+
raw_user_metadata.summary,
490+
Some("codec summary".as_json_payload().unwrap())
491+
);
492+
assert_eq!(
493+
raw_user_metadata.details,
494+
Some("codec details".as_json_payload().unwrap())
495+
);
496+
assert_eq!(desc.static_summary(), Some("codec summary"));
497+
assert_eq!(desc.static_details(), Some("codec details"));
498+
}
499+
500+
#[tokio::test]
501+
async fn describe_decodes_user_metadata_with_ungated_xor_codec() {
502+
let wf_name = DescribeDataConverterWorkflow::name();
503+
let codec = Arc::new(XorCodec::new_with_metadata_gate(0x42, false));
504+
505+
let connection = get_integ_connection(None).await;
506+
let data_converter = DataConverter::new(
507+
PayloadConverter::default(),
508+
DefaultFailureConverter,
509+
codec.clone(),
510+
);
511+
let client_opts = ClientOptions::new(integ_namespace())
512+
.data_converter(data_converter)
513+
.build();
514+
let client = Client::new(connection, client_opts).unwrap();
515+
516+
let mut starter = CoreWfStarter::new_with_overrides(wf_name, None, Some(client));
517+
starter.sdk_config.register_activities(TestActivities);
518+
starter.sdk_config.task_types = WorkerTaskTypes::all();
519+
starter
520+
.sdk_config
521+
.register_workflow::<DescribeDataConverterWorkflow>();
522+
let wf_id = starter.get_task_queue().to_owned();
523+
let mut worker = starter.worker().await;
524+
525+
let handle = worker
526+
.submit_workflow(
527+
DescribeDataConverterWorkflow::run,
528+
TrackedWrapper(TrackedValue::new("codec-describe".to_string())),
529+
WorkflowStartOptions::new(starter.get_task_queue(), wf_id)
530+
.static_summary("codec summary")
531+
.static_details("codec details")
532+
.build(),
533+
)
534+
.await
535+
.unwrap();
536+
worker.run_until_done().await.unwrap();
537+
538+
let decode_count_before = codec.decode_count();
539+
let desc = handle
540+
.describe(WorkflowDescribeOptions::default())
541+
.await
542+
.unwrap();
543+
544+
assert!(
545+
codec.decode_count() > decode_count_before,
546+
"Describe should have decoded response payloads"
547+
);
548+
assert_eq!(
549+
desc.memo().unwrap().fields["tracked"],
550+
"codec-describe".as_json_payload().unwrap()
551+
);
552+
// Making sure codec isn't used when decoding user metadata
467553
assert_eq!(desc.static_summary(), Some("codec summary"));
468554
assert_eq!(desc.static_details(), Some("codec details"));
469555
}

crates/sdk-core/tests/integ_tests/workflow_tests.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use temporalio_common::{
4444
protos::{
4545
DEFAULT_WORKFLOW_TYPE, canned_histories,
4646
coresdk::{
47-
ActivityTaskCompletion, IntoCompletion,
47+
ActivityTaskCompletion, AsJsonPayloadExt, IntoCompletion,
4848
activity_result::ActivityExecutionResult,
4949
workflow_activation::{WorkflowActivationJob, workflow_activation_job},
5050
workflow_commands::{
@@ -1061,7 +1061,7 @@ async fn pass_timer_summary_to_metadata() {
10611061
let mut mock_cfg = MockPollCfg::from_hist_builder(t);
10621062
let wf_id = mock_cfg.hists[0].wf_id.clone();
10631063
let expected_user_metadata = Some(UserMetadata {
1064-
summary: Some(b"timer summary".into()),
1064+
summary: Some("timer summary".as_json_payload().unwrap()),
10651065
details: None,
10661066
});
10671067
mock_cfg.completion_asserts_from_expectations(|mut asserts| {

crates/sdk/src/workflow_context.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,11 @@ impl BaseWorkflowContext {
421421
let seq = self.inner.seq_nums.borrow_mut().next_timer_seq();
422422
let (cmd, unblocker) =
423423
CancellableWFCommandFut::new(CancellableID::Timer(seq), self.clone());
424+
let payload_converter = PayloadConverter::default();
425+
let context = SerializationContext {
426+
data: &SerializationContextData::Workflow,
427+
converter: &payload_converter,
428+
};
424429
self.send(
425430
CommandCreateRequest {
426431
cmd: WorkflowCommand {
@@ -436,7 +441,11 @@ impl BaseWorkflowContext {
436441
.into(),
437442
),
438443
user_metadata: Some(UserMetadata {
439-
summary: opts.summary.map(|x| x.as_bytes().into()),
444+
summary: opts.summary.map(|summary| {
445+
payload_converter
446+
.to_payload(&context, &summary)
447+
.expect("String-to-JSON payload serialization is infallible")
448+
}),
440449
details: None,
441450
}),
442451
},

crates/sdk/src/workflow_context/options.rs

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,26 @@
11
use std::{collections::HashMap, time::Duration};
22

33
use temporalio_client::Priority;
4-
use temporalio_common::protos::{
5-
coresdk::{
6-
AsJsonPayloadExt,
7-
child_workflow::ChildWorkflowCancellationType,
8-
common::VersioningIntent,
9-
nexus::NexusOperationCancellationType,
10-
workflow_commands::{
11-
ActivityCancellationType, ContinueAsNewWorkflowExecution, ScheduleActivity,
12-
ScheduleLocalActivity, ScheduleNexusOperation, StartChildWorkflowExecution,
13-
WorkflowCommand,
14-
},
4+
use temporalio_common::{
5+
data_converters::{
6+
GenericPayloadConverter, PayloadConverter, SerializationContext, SerializationContextData,
157
},
16-
temporal::api::{
17-
common::v1::{Payload, RetryPolicy, SearchAttributes},
18-
enums::v1::{ParentClosePolicy, WorkflowIdReusePolicy},
19-
sdk::v1::UserMetadata,
8+
protos::{
9+
coresdk::{
10+
child_workflow::ChildWorkflowCancellationType,
11+
common::VersioningIntent,
12+
nexus::NexusOperationCancellationType,
13+
workflow_commands::{
14+
ActivityCancellationType, ContinueAsNewWorkflowExecution, ScheduleActivity,
15+
ScheduleLocalActivity, ScheduleNexusOperation, StartChildWorkflowExecution,
16+
WorkflowCommand,
17+
},
18+
},
19+
temporal::api::{
20+
common::v1::{Payload, RetryPolicy, SearchAttributes},
21+
enums::v1::{ParentClosePolicy, WorkflowIdReusePolicy},
22+
sdk::v1::UserMetadata,
23+
},
2024
},
2125
};
2226
// TODO: Before release, probably best to avoid using proto types entirely here. They're awkward.
@@ -81,6 +85,11 @@ impl ActivityOptions {
8185
arguments: Vec<Payload>,
8286
seq: u32,
8387
) -> WorkflowCommand {
88+
let payload_converter = PayloadConverter::default();
89+
let context = SerializationContext {
90+
data: &SerializationContextData::Workflow,
91+
converter: &payload_converter,
92+
};
8493
WorkflowCommand {
8594
variant: Some(
8695
ScheduleActivity {
@@ -113,7 +122,8 @@ impl ActivityOptions {
113122
user_metadata: self
114123
.summary
115124
.map(|s| {
116-
s.as_json_payload()
125+
payload_converter
126+
.to_payload(&context, &s)
117127
.expect("String-to-JSON payload serialization is infallible")
118128
})
119129
.map(|summary| UserMetadata {
@@ -171,6 +181,11 @@ impl LocalActivityOptions {
171181
arguments: Vec<Payload>,
172182
seq: u32,
173183
) -> WorkflowCommand {
184+
let payload_converter = PayloadConverter::default();
185+
let context = SerializationContext {
186+
data: &SerializationContextData::Workflow,
187+
converter: &payload_converter,
188+
};
174189
// Allow tests to avoid extra verbosity when they don't care about timeouts
175190
// TODO: Builderize LA options
176191
self.schedule_to_close_timeout
@@ -209,8 +224,8 @@ impl LocalActivityOptions {
209224
user_metadata: self
210225
.summary
211226
.map(|summary| {
212-
summary
213-
.as_json_payload()
227+
payload_converter
228+
.to_payload(&context, &summary)
214229
.expect("String-to-JSON payload serialization is infallible")
215230
})
216231
.map(|summary| UserMetadata {
@@ -261,14 +276,21 @@ impl ChildWorkflowOptions {
261276
input: Vec<Payload>,
262277
seq: u32,
263278
) -> WorkflowCommand {
279+
let payload_converter = PayloadConverter::default();
280+
let context = SerializationContext {
281+
data: &SerializationContextData::Workflow,
282+
converter: &payload_converter,
283+
};
264284
let user_metadata = if self.static_summary.is_some() || self.static_details.is_some() {
265285
Some(UserMetadata {
266286
summary: self.static_summary.map(|s| {
267-
s.as_json_payload()
287+
payload_converter
288+
.to_payload(&context, &s)
268289
.expect("String-to-JSON payload serialization is infallible")
269290
}),
270291
details: self.static_details.map(|s| {
271-
s.as_json_payload()
292+
payload_converter
293+
.to_payload(&context, &s)
272294
.expect("String-to-JSON payload serialization is infallible")
273295
}),
274296
})

0 commit comments

Comments
 (0)