From fefe920bb72942d44f089b5dca18583e3237d943 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Tue, 14 Apr 2026 15:02:18 -0400 Subject: [PATCH 1/2] fix(sdk): normalize how user metadata is encoded --- crates/client/src/lib.rs | 18 ++++-- crates/client/src/workflow_handle.rs | 25 ++++---- .../tests/integ_tests/data_converter_tests.rs | 14 +++++ .../tests/integ_tests/workflow_tests.rs | 4 +- crates/sdk/src/workflow_context.rs | 11 +++- crates/sdk/src/workflow_context/options.rs | 62 +++++++++++++------ 6 files changed, 96 insertions(+), 38 deletions(-) diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index c99f7b06d..1f1f6ef05 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -71,9 +71,12 @@ use std::{ }; use temporalio_common::{ HasWorkflowDefinition, - data_converters::{DataConverter, SerializationContextData}, + data_converters::{ + DataConverter, GenericPayloadConverter, PayloadConverter, SerializationContext, + SerializationContextData, + }, protos::{ - coresdk::{AsJsonPayloadExt, IntoPayloadsExt}, + coresdk::IntoPayloadsExt, grpc::health::v1::health_client::HealthClient, proto_ts_to_system_time, temporal::api::{ @@ -1063,13 +1066,20 @@ where let user_metadata = if options.static_summary.is_some() || options.static_details.is_some() { + let payload_converter = PayloadConverter::default(); + let context = SerializationContext { + data: &SerializationContextData::Workflow, + converter: &payload_converter, + }; Some(UserMetadata { summary: options.static_summary.map(|s| { - s.as_json_payload() + payload_converter + .to_payload(&context, &s) .expect("String-to-JSON payload serialization is infallible") }), details: options.static_details.map(|s| { - s.as_json_payload() + payload_converter + .to_payload(&context, &s) .expect("String-to-JSON payload serialization is infallible") }), }) diff --git a/crates/client/src/workflow_handle.rs b/crates/client/src/workflow_handle.rs index 4628782a1..976dd07e7 100644 --- a/crates/client/src/workflow_handle.rs +++ b/crates/client/src/workflow_handle.rs @@ -12,7 +12,10 @@ use std::{fmt::Debug, marker::PhantomData}; pub use temporalio_common::UntypedWorkflow; use temporalio_common::{ HasWorkflowDefinition, QueryDefinition, SignalDefinition, UpdateDefinition, WorkflowDefinition, - data_converters::{DataConverter, PayloadConversionError, RawValue, SerializationContextData}, + data_converters::{ + DataConverter, GenericPayloadConverter, PayloadConversionError, PayloadConverter, RawValue, + SerializationContext, SerializationContextData, + }, payload_visitor::decode_payloads, protos::{ coresdk::FromPayloadsExt, @@ -48,21 +51,25 @@ struct DecodedUserMetadata { details: Option, } -async fn decode_user_metadata( - data_converter: &DataConverter, +fn decode_user_metadata( context: &SerializationContextData, user_metadata: Option, ) -> Result { + let payload_converter = PayloadConverter::default(); + let context = SerializationContext { + data: context, + converter: &payload_converter, + }; let (summary, details) = user_metadata .map(|metadata| (metadata.summary, metadata.details)) .unwrap_or_default(); Ok(DecodedUserMetadata { summary: match summary { - Some(payload) => Some(data_converter.from_payload(context, payload).await?), + Some(payload) => Some(payload_converter.from_payload(&context, payload)?), None => None, }, details: match details { - Some(payload) => Some(data_converter.from_payload(context, payload).await?), + Some(payload) => Some(payload_converter.from_payload(&context, payload)?), None => None, }, }) @@ -119,12 +126,8 @@ impl WorkflowExecutionDescription { &SerializationContextData::Workflow, ) .await; - let decoded_metadata = decode_user_metadata( - data_converter, - &SerializationContextData::Workflow, - raw_user_metadata, - ) - .await?; + let decoded_metadata = + decode_user_metadata(&SerializationContextData::Workflow, raw_user_metadata)?; let history_length_raw = raw_description .workflow_execution_info .as_ref() diff --git a/crates/sdk-core/tests/integ_tests/data_converter_tests.rs b/crates/sdk-core/tests/integ_tests/data_converter_tests.rs index 0f90147a7..6d984b37d 100644 --- a/crates/sdk-core/tests/integ_tests/data_converter_tests.rs +++ b/crates/sdk-core/tests/integ_tests/data_converter_tests.rs @@ -464,6 +464,20 @@ async fn describe_decodes_workflow_payload_fields() { desc.memo().unwrap().fields["tracked"], "codec-describe".as_json_payload().unwrap() ); + let raw_user_metadata = desc + .raw_description + .execution_config + .as_ref() + .and_then(|cfg| cfg.user_metadata.as_ref()) + .expect("describe response should include user metadata"); + assert_eq!( + raw_user_metadata.summary, + Some("codec summary".as_json_payload().unwrap()) + ); + assert_eq!( + raw_user_metadata.details, + Some("codec details".as_json_payload().unwrap()) + ); assert_eq!(desc.static_summary(), Some("codec summary")); assert_eq!(desc.static_details(), Some("codec details")); } diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests.rs b/crates/sdk-core/tests/integ_tests/workflow_tests.rs index 1a5945a36..70e7c368f 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests.rs @@ -44,7 +44,7 @@ use temporalio_common::{ protos::{ DEFAULT_WORKFLOW_TYPE, canned_histories, coresdk::{ - ActivityTaskCompletion, IntoCompletion, + ActivityTaskCompletion, AsJsonPayloadExt, IntoCompletion, activity_result::ActivityExecutionResult, workflow_activation::{WorkflowActivationJob, workflow_activation_job}, workflow_commands::{ @@ -1061,7 +1061,7 @@ async fn pass_timer_summary_to_metadata() { let mut mock_cfg = MockPollCfg::from_hist_builder(t); let wf_id = mock_cfg.hists[0].wf_id.clone(); let expected_user_metadata = Some(UserMetadata { - summary: Some(b"timer summary".into()), + summary: Some("timer summary".as_json_payload().unwrap()), details: None, }); mock_cfg.completion_asserts_from_expectations(|mut asserts| { diff --git a/crates/sdk/src/workflow_context.rs b/crates/sdk/src/workflow_context.rs index ec4f912b7..809e4d6ca 100644 --- a/crates/sdk/src/workflow_context.rs +++ b/crates/sdk/src/workflow_context.rs @@ -421,6 +421,11 @@ impl BaseWorkflowContext { let seq = self.inner.seq_nums.borrow_mut().next_timer_seq(); let (cmd, unblocker) = CancellableWFCommandFut::new(CancellableID::Timer(seq), self.clone()); + let payload_converter = PayloadConverter::default(); + let context = SerializationContext { + data: &SerializationContextData::Workflow, + converter: &payload_converter, + }; self.send( CommandCreateRequest { cmd: WorkflowCommand { @@ -436,7 +441,11 @@ impl BaseWorkflowContext { .into(), ), user_metadata: Some(UserMetadata { - summary: opts.summary.map(|x| x.as_bytes().into()), + summary: opts.summary.map(|summary| { + payload_converter + .to_payload(&context, &summary) + .expect("String-to-JSON payload serialization is infallible") + }), details: None, }), }, diff --git a/crates/sdk/src/workflow_context/options.rs b/crates/sdk/src/workflow_context/options.rs index 678f55cd4..b2a75b927 100644 --- a/crates/sdk/src/workflow_context/options.rs +++ b/crates/sdk/src/workflow_context/options.rs @@ -1,22 +1,26 @@ use std::{collections::HashMap, time::Duration}; use temporalio_client::Priority; -use temporalio_common::protos::{ - coresdk::{ - AsJsonPayloadExt, - child_workflow::ChildWorkflowCancellationType, - common::VersioningIntent, - nexus::NexusOperationCancellationType, - workflow_commands::{ - ActivityCancellationType, ContinueAsNewWorkflowExecution, ScheduleActivity, - ScheduleLocalActivity, ScheduleNexusOperation, StartChildWorkflowExecution, - WorkflowCommand, - }, +use temporalio_common::{ + data_converters::{ + GenericPayloadConverter, PayloadConverter, SerializationContext, SerializationContextData, }, - temporal::api::{ - common::v1::{Payload, RetryPolicy, SearchAttributes}, - enums::v1::{ParentClosePolicy, WorkflowIdReusePolicy}, - sdk::v1::UserMetadata, + protos::{ + coresdk::{ + child_workflow::ChildWorkflowCancellationType, + common::VersioningIntent, + nexus::NexusOperationCancellationType, + workflow_commands::{ + ActivityCancellationType, ContinueAsNewWorkflowExecution, ScheduleActivity, + ScheduleLocalActivity, ScheduleNexusOperation, StartChildWorkflowExecution, + WorkflowCommand, + }, + }, + temporal::api::{ + common::v1::{Payload, RetryPolicy, SearchAttributes}, + enums::v1::{ParentClosePolicy, WorkflowIdReusePolicy}, + sdk::v1::UserMetadata, + }, }, }; // TODO: Before release, probably best to avoid using proto types entirely here. They're awkward. @@ -81,6 +85,11 @@ impl ActivityOptions { arguments: Vec, seq: u32, ) -> WorkflowCommand { + let payload_converter = PayloadConverter::default(); + let context = SerializationContext { + data: &SerializationContextData::Workflow, + converter: &payload_converter, + }; WorkflowCommand { variant: Some( ScheduleActivity { @@ -113,7 +122,8 @@ impl ActivityOptions { user_metadata: self .summary .map(|s| { - s.as_json_payload() + payload_converter + .to_payload(&context, &s) .expect("String-to-JSON payload serialization is infallible") }) .map(|summary| UserMetadata { @@ -171,6 +181,11 @@ impl LocalActivityOptions { arguments: Vec, seq: u32, ) -> WorkflowCommand { + let payload_converter = PayloadConverter::default(); + let context = SerializationContext { + data: &SerializationContextData::Workflow, + converter: &payload_converter, + }; // Allow tests to avoid extra verbosity when they don't care about timeouts // TODO: Builderize LA options self.schedule_to_close_timeout @@ -209,8 +224,8 @@ impl LocalActivityOptions { user_metadata: self .summary .map(|summary| { - summary - .as_json_payload() + payload_converter + .to_payload(&context, &summary) .expect("String-to-JSON payload serialization is infallible") }) .map(|summary| UserMetadata { @@ -261,14 +276,21 @@ impl ChildWorkflowOptions { input: Vec, seq: u32, ) -> WorkflowCommand { + let payload_converter = PayloadConverter::default(); + let context = SerializationContext { + data: &SerializationContextData::Workflow, + converter: &payload_converter, + }; let user_metadata = if self.static_summary.is_some() || self.static_details.is_some() { Some(UserMetadata { summary: self.static_summary.map(|s| { - s.as_json_payload() + payload_converter + .to_payload(&context, &s) .expect("String-to-JSON payload serialization is infallible") }), details: self.static_details.map(|s| { - s.as_json_payload() + payload_converter + .to_payload(&context, &s) .expect("String-to-JSON payload serialization is infallible") }), }) From 51a5adb61e20a615631f0f845120405b580ee524 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Tue, 14 Apr 2026 15:27:07 -0400 Subject: [PATCH 2/2] chore: add test asserting codec isnt used decoding user metadata --- .../tests/integ_tests/data_converter_tests.rs | 76 ++++++++++++++++++- 1 file changed, 74 insertions(+), 2 deletions(-) diff --git a/crates/sdk-core/tests/integ_tests/data_converter_tests.rs b/crates/sdk-core/tests/integ_tests/data_converter_tests.rs index 6d984b37d..fc3f770b4 100644 --- a/crates/sdk-core/tests/integ_tests/data_converter_tests.rs +++ b/crates/sdk-core/tests/integ_tests/data_converter_tests.rs @@ -284,6 +284,7 @@ async fn multi_args_serializes_as_multiple_payloads() { /// A codec that XORs payload data with a key and tracks encode/decode operations. struct XorCodec { key: u8, + gate_on_metadata: bool, encode_count: AtomicUsize, decode_count: AtomicUsize, } @@ -291,12 +292,22 @@ struct XorCodec { impl XorCodec { fn new(key: u8) -> Self { Self { + gate_on_metadata: true, key, encode_count: AtomicUsize::new(0), decode_count: AtomicUsize::new(0), } } + fn new_with_metadata_gate(key: u8, gate_on_metadata: bool) -> Self { + Self { + key, + gate_on_metadata, + encode_count: AtomicUsize::new(0), + decode_count: AtomicUsize::new(0), + } + } + fn encode_count(&self) -> usize { self.encode_count.load(Ordering::SeqCst) } @@ -316,12 +327,15 @@ impl PayloadCodec for XorCodec { eprintln!("XorCodec::encode called with {} payloads", count); self.encode_count.fetch_add(count, Ordering::SeqCst); let key = self.key; + let gate_on_metadata = self.gate_on_metadata; async move { payloads .into_iter() .map(|mut p| { p.data = p.data.iter().map(|b| b ^ key).collect(); - p.metadata.insert("xor_encoded".to_string(), vec![key]); + if gate_on_metadata { + p.metadata.insert("xor_encoded".to_string(), vec![key]); + } p }) .collect() @@ -338,11 +352,12 @@ impl PayloadCodec for XorCodec { eprintln!("XorCodec::decode called with {} payloads", count); self.decode_count.fetch_add(count, Ordering::SeqCst); let key = self.key; + let gate_on_metadata = self.gate_on_metadata; async move { payloads .into_iter() .map(|mut p| { - if p.metadata.remove("xor_encoded").is_some() { + if !gate_on_metadata || p.metadata.remove("xor_encoded").is_some() { p.data = p.data.iter().map(|b| b ^ key).collect(); } p @@ -481,3 +496,60 @@ async fn describe_decodes_workflow_payload_fields() { assert_eq!(desc.static_summary(), Some("codec summary")); assert_eq!(desc.static_details(), Some("codec details")); } + +#[tokio::test] +async fn describe_decodes_user_metadata_with_ungated_xor_codec() { + let wf_name = DescribeDataConverterWorkflow::name(); + let codec = Arc::new(XorCodec::new_with_metadata_gate(0x42, false)); + + let connection = get_integ_connection(None).await; + let data_converter = DataConverter::new( + PayloadConverter::default(), + DefaultFailureConverter, + codec.clone(), + ); + let client_opts = ClientOptions::new(integ_namespace()) + .data_converter(data_converter) + .build(); + let client = Client::new(connection, client_opts).unwrap(); + + let mut starter = CoreWfStarter::new_with_overrides(wf_name, None, Some(client)); + starter.sdk_config.register_activities(TestActivities); + starter.sdk_config.task_types = WorkerTaskTypes::all(); + starter + .sdk_config + .register_workflow::(); + let wf_id = starter.get_task_queue().to_owned(); + let mut worker = starter.worker().await; + + let handle = worker + .submit_workflow( + DescribeDataConverterWorkflow::run, + TrackedWrapper(TrackedValue::new("codec-describe".to_string())), + WorkflowStartOptions::new(starter.get_task_queue(), wf_id) + .static_summary("codec summary") + .static_details("codec details") + .build(), + ) + .await + .unwrap(); + worker.run_until_done().await.unwrap(); + + let decode_count_before = codec.decode_count(); + let desc = handle + .describe(WorkflowDescribeOptions::default()) + .await + .unwrap(); + + assert!( + codec.decode_count() > decode_count_before, + "Describe should have decoded response payloads" + ); + assert_eq!( + desc.memo().unwrap().fields["tracked"], + "codec-describe".as_json_payload().unwrap() + ); + // Making sure codec isn't used when decoding user metadata + assert_eq!(desc.static_summary(), Some("codec summary")); + assert_eq!(desc.static_details(), Some("codec details")); +}