Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions crates/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,12 @@ use std::{
};
use temporalio_common::{
HasWorkflowDefinition,
data_converters::{DataConverter, SerializationContextData},
data_converters::{
DataConverter, GenericPayloadConverter, PayloadConverter, SerializationContext,
SerializationContextData,
},
protos::{
coresdk::{AsJsonPayloadExt, IntoPayloadsExt},
coresdk::IntoPayloadsExt,
grpc::health::v1::health_client::HealthClient,
proto_ts_to_system_time,
temporal::api::{
Expand Down Expand Up @@ -1063,13 +1066,20 @@ where

let user_metadata = if options.static_summary.is_some() || options.static_details.is_some()
{
let payload_converter = PayloadConverter::default();
let context = SerializationContext {
data: &SerializationContextData::Workflow,
converter: &payload_converter,
};
Some(UserMetadata {
summary: options.static_summary.map(|s| {
s.as_json_payload()
payload_converter
.to_payload(&context, &s)
.expect("String-to-JSON payload serialization is infallible")
}),
details: options.static_details.map(|s| {
s.as_json_payload()
payload_converter
.to_payload(&context, &s)
.expect("String-to-JSON payload serialization is infallible")
}),
})
Expand Down
25 changes: 14 additions & 11 deletions crates/client/src/workflow_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use std::{fmt::Debug, marker::PhantomData};
pub use temporalio_common::UntypedWorkflow;
use temporalio_common::{
HasWorkflowDefinition, QueryDefinition, SignalDefinition, UpdateDefinition, WorkflowDefinition,
data_converters::{DataConverter, PayloadConversionError, RawValue, SerializationContextData},
data_converters::{
DataConverter, GenericPayloadConverter, PayloadConversionError, PayloadConverter, RawValue,
SerializationContext, SerializationContextData,
},
payload_visitor::decode_payloads,
protos::{
coresdk::FromPayloadsExt,
Expand Down Expand Up @@ -48,21 +51,25 @@ struct DecodedUserMetadata {
details: Option<String>,
}

async fn decode_user_metadata(
data_converter: &DataConverter,
fn decode_user_metadata(
context: &SerializationContextData,
user_metadata: Option<UserMetadata>,
) -> Result<DecodedUserMetadata, PayloadConversionError> {
let payload_converter = PayloadConverter::default();
let context = SerializationContext {
data: context,
converter: &payload_converter,
};
let (summary, details) = user_metadata
.map(|metadata| (metadata.summary, metadata.details))
.unwrap_or_default();
Ok(DecodedUserMetadata {
summary: match summary {
Some(payload) => Some(data_converter.from_payload(context, payload).await?),
Some(payload) => Some(payload_converter.from_payload(&context, payload)?),
None => None,
},
details: match details {
Some(payload) => Some(data_converter.from_payload(context, payload).await?),
Some(payload) => Some(payload_converter.from_payload(&context, payload)?),
None => None,
},
})
Expand Down Expand Up @@ -119,12 +126,8 @@ impl WorkflowExecutionDescription {
&SerializationContextData::Workflow,
)
.await;
let decoded_metadata = decode_user_metadata(
data_converter,
&SerializationContextData::Workflow,
raw_user_metadata,
)
.await?;
let decoded_metadata =
decode_user_metadata(&SerializationContextData::Workflow, raw_user_metadata)?;
let history_length_raw = raw_description
.workflow_execution_info
.as_ref()
Expand Down
90 changes: 88 additions & 2 deletions crates/sdk-core/tests/integ_tests/data_converter_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,19 +284,30 @@ async fn multi_args_serializes_as_multiple_payloads() {
/// A codec that XORs payload data with a key and tracks encode/decode operations.
struct XorCodec {
key: u8,
gate_on_metadata: bool,
encode_count: AtomicUsize,
decode_count: AtomicUsize,
}

impl XorCodec {
fn new(key: u8) -> Self {
Self {
gate_on_metadata: true,
key,
encode_count: AtomicUsize::new(0),
decode_count: AtomicUsize::new(0),
}
}

fn new_with_metadata_gate(key: u8, gate_on_metadata: bool) -> Self {
Self {
key,
gate_on_metadata,
encode_count: AtomicUsize::new(0),
decode_count: AtomicUsize::new(0),
}
}

fn encode_count(&self) -> usize {
self.encode_count.load(Ordering::SeqCst)
}
Expand All @@ -316,12 +327,15 @@ impl PayloadCodec for XorCodec {
eprintln!("XorCodec::encode called with {} payloads", count);
self.encode_count.fetch_add(count, Ordering::SeqCst);
let key = self.key;
let gate_on_metadata = self.gate_on_metadata;
async move {
payloads
.into_iter()
.map(|mut p| {
p.data = p.data.iter().map(|b| b ^ key).collect();
p.metadata.insert("xor_encoded".to_string(), vec![key]);
if gate_on_metadata {
p.metadata.insert("xor_encoded".to_string(), vec![key]);
}
p
})
.collect()
Expand All @@ -338,11 +352,12 @@ impl PayloadCodec for XorCodec {
eprintln!("XorCodec::decode called with {} payloads", count);
self.decode_count.fetch_add(count, Ordering::SeqCst);
let key = self.key;
let gate_on_metadata = self.gate_on_metadata;
async move {
payloads
.into_iter()
.map(|mut p| {
if p.metadata.remove("xor_encoded").is_some() {
if !gate_on_metadata || p.metadata.remove("xor_encoded").is_some() {
p.data = p.data.iter().map(|b| b ^ key).collect();
}
p
Expand Down Expand Up @@ -464,6 +479,77 @@ async fn describe_decodes_workflow_payload_fields() {
desc.memo().unwrap().fields["tracked"],
"codec-describe".as_json_payload().unwrap()
);
let raw_user_metadata = desc
.raw_description
.execution_config
.as_ref()
.and_then(|cfg| cfg.user_metadata.as_ref())
.expect("describe response should include user metadata");
assert_eq!(
raw_user_metadata.summary,
Some("codec summary".as_json_payload().unwrap())
);
assert_eq!(
raw_user_metadata.details,
Some("codec details".as_json_payload().unwrap())
);
assert_eq!(desc.static_summary(), Some("codec summary"));
assert_eq!(desc.static_details(), Some("codec details"));
}

#[tokio::test]
async fn describe_decodes_user_metadata_with_ungated_xor_codec() {
let wf_name = DescribeDataConverterWorkflow::name();
let codec = Arc::new(XorCodec::new_with_metadata_gate(0x42, false));

let connection = get_integ_connection(None).await;
let data_converter = DataConverter::new(
PayloadConverter::default(),
DefaultFailureConverter,
codec.clone(),
);
let client_opts = ClientOptions::new(integ_namespace())
.data_converter(data_converter)
.build();
let client = Client::new(connection, client_opts).unwrap();

let mut starter = CoreWfStarter::new_with_overrides(wf_name, None, Some(client));
starter.sdk_config.register_activities(TestActivities);
starter.sdk_config.task_types = WorkerTaskTypes::all();
starter
.sdk_config
.register_workflow::<DescribeDataConverterWorkflow>();
let wf_id = starter.get_task_queue().to_owned();
let mut worker = starter.worker().await;

let handle = worker
.submit_workflow(
DescribeDataConverterWorkflow::run,
TrackedWrapper(TrackedValue::new("codec-describe".to_string())),
WorkflowStartOptions::new(starter.get_task_queue(), wf_id)
.static_summary("codec summary")
.static_details("codec details")
.build(),
)
.await
.unwrap();
worker.run_until_done().await.unwrap();

let decode_count_before = codec.decode_count();
let desc = handle
.describe(WorkflowDescribeOptions::default())
.await
.unwrap();

assert!(
codec.decode_count() > decode_count_before,
"Describe should have decoded response payloads"
);
assert_eq!(
desc.memo().unwrap().fields["tracked"],
"codec-describe".as_json_payload().unwrap()
);
// Making sure codec isn't used when decoding user metadata
assert_eq!(desc.static_summary(), Some("codec summary"));
assert_eq!(desc.static_details(), Some("codec details"));
}
4 changes: 2 additions & 2 deletions crates/sdk-core/tests/integ_tests/workflow_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use temporalio_common::{
protos::{
DEFAULT_WORKFLOW_TYPE, canned_histories,
coresdk::{
ActivityTaskCompletion, IntoCompletion,
ActivityTaskCompletion, AsJsonPayloadExt, IntoCompletion,
activity_result::ActivityExecutionResult,
workflow_activation::{WorkflowActivationJob, workflow_activation_job},
workflow_commands::{
Expand Down Expand Up @@ -1061,7 +1061,7 @@ async fn pass_timer_summary_to_metadata() {
let mut mock_cfg = MockPollCfg::from_hist_builder(t);
let wf_id = mock_cfg.hists[0].wf_id.clone();
let expected_user_metadata = Some(UserMetadata {
summary: Some(b"timer summary".into()),
summary: Some("timer summary".as_json_payload().unwrap()),
details: None,
});
mock_cfg.completion_asserts_from_expectations(|mut asserts| {
Expand Down
11 changes: 10 additions & 1 deletion crates/sdk/src/workflow_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,11 @@ impl BaseWorkflowContext {
let seq = self.inner.seq_nums.borrow_mut().next_timer_seq();
let (cmd, unblocker) =
CancellableWFCommandFut::new(CancellableID::Timer(seq), self.clone());
let payload_converter = PayloadConverter::default();
let context = SerializationContext {
data: &SerializationContextData::Workflow,
converter: &payload_converter,
};
self.send(
CommandCreateRequest {
cmd: WorkflowCommand {
Expand All @@ -436,7 +441,11 @@ impl BaseWorkflowContext {
.into(),
),
user_metadata: Some(UserMetadata {
summary: opts.summary.map(|x| x.as_bytes().into()),
summary: opts.summary.map(|summary| {
payload_converter
.to_payload(&context, &summary)
.expect("String-to-JSON payload serialization is infallible")
}),
details: None,
}),
},
Expand Down
62 changes: 42 additions & 20 deletions crates/sdk/src/workflow_context/options.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
use std::{collections::HashMap, time::Duration};

use temporalio_client::Priority;
use temporalio_common::protos::{
coresdk::{
AsJsonPayloadExt,
child_workflow::ChildWorkflowCancellationType,
common::VersioningIntent,
nexus::NexusOperationCancellationType,
workflow_commands::{
ActivityCancellationType, ContinueAsNewWorkflowExecution, ScheduleActivity,
ScheduleLocalActivity, ScheduleNexusOperation, StartChildWorkflowExecution,
WorkflowCommand,
},
use temporalio_common::{
data_converters::{
GenericPayloadConverter, PayloadConverter, SerializationContext, SerializationContextData,
},
temporal::api::{
common::v1::{Payload, RetryPolicy, SearchAttributes},
enums::v1::{ParentClosePolicy, WorkflowIdReusePolicy},
sdk::v1::UserMetadata,
protos::{
coresdk::{
child_workflow::ChildWorkflowCancellationType,
common::VersioningIntent,
nexus::NexusOperationCancellationType,
workflow_commands::{
ActivityCancellationType, ContinueAsNewWorkflowExecution, ScheduleActivity,
ScheduleLocalActivity, ScheduleNexusOperation, StartChildWorkflowExecution,
WorkflowCommand,
},
},
temporal::api::{
common::v1::{Payload, RetryPolicy, SearchAttributes},
enums::v1::{ParentClosePolicy, WorkflowIdReusePolicy},
sdk::v1::UserMetadata,
},
},
};
// TODO: Before release, probably best to avoid using proto types entirely here. They're awkward.
Expand Down Expand Up @@ -81,6 +85,11 @@ impl ActivityOptions {
arguments: Vec<Payload>,
seq: u32,
) -> WorkflowCommand {
let payload_converter = PayloadConverter::default();
let context = SerializationContext {
data: &SerializationContextData::Workflow,
converter: &payload_converter,
};
WorkflowCommand {
variant: Some(
ScheduleActivity {
Expand Down Expand Up @@ -113,7 +122,8 @@ impl ActivityOptions {
user_metadata: self
.summary
.map(|s| {
s.as_json_payload()
payload_converter
.to_payload(&context, &s)
.expect("String-to-JSON payload serialization is infallible")
})
.map(|summary| UserMetadata {
Expand Down Expand Up @@ -171,6 +181,11 @@ impl LocalActivityOptions {
arguments: Vec<Payload>,
seq: u32,
) -> WorkflowCommand {
let payload_converter = PayloadConverter::default();
let context = SerializationContext {
data: &SerializationContextData::Workflow,
converter: &payload_converter,
};
// Allow tests to avoid extra verbosity when they don't care about timeouts
// TODO: Builderize LA options
self.schedule_to_close_timeout
Expand Down Expand Up @@ -209,8 +224,8 @@ impl LocalActivityOptions {
user_metadata: self
.summary
.map(|summary| {
summary
.as_json_payload()
payload_converter
.to_payload(&context, &summary)
.expect("String-to-JSON payload serialization is infallible")
})
.map(|summary| UserMetadata {
Expand Down Expand Up @@ -261,14 +276,21 @@ impl ChildWorkflowOptions {
input: Vec<Payload>,
seq: u32,
) -> WorkflowCommand {
let payload_converter = PayloadConverter::default();
let context = SerializationContext {
data: &SerializationContextData::Workflow,
converter: &payload_converter,
};
let user_metadata = if self.static_summary.is_some() || self.static_details.is_some() {
Some(UserMetadata {
summary: self.static_summary.map(|s| {
s.as_json_payload()
payload_converter
.to_payload(&context, &s)
.expect("String-to-JSON payload serialization is infallible")
}),
details: self.static_details.map(|s| {
s.as_json_payload()
payload_converter
.to_payload(&context, &s)
.expect("String-to-JSON payload serialization is infallible")
}),
})
Expand Down
Loading