Skip to content

Commit cda1fdb

Browse files
authored
Merge branch 'master' into optimize-deps
2 parents 2773f67 + 904f479 commit cda1fdb

10 files changed

Lines changed: 174 additions & 44 deletions

File tree

crates/client/src/lib.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,12 @@ use std::{
7171
};
7272
use temporalio_common::{
7373
HasWorkflowDefinition,
74-
data_converters::{DataConverter, SerializationContextData},
74+
data_converters::{
75+
DataConverter, GenericPayloadConverter, PayloadConverter, SerializationContext,
76+
SerializationContextData,
77+
},
7578
protos::{
76-
coresdk::{AsJsonPayloadExt, IntoPayloadsExt},
79+
coresdk::IntoPayloadsExt,
7780
grpc::health::v1::health_client::HealthClient,
7881
proto_ts_to_system_time,
7982
temporal::api::{
@@ -1063,13 +1066,20 @@ where
10631066

10641067
let user_metadata = if options.static_summary.is_some() || options.static_details.is_some()
10651068
{
1069+
let payload_converter = PayloadConverter::default();
1070+
let context = SerializationContext {
1071+
data: &SerializationContextData::Workflow,
1072+
converter: &payload_converter,
1073+
};
10661074
Some(UserMetadata {
10671075
summary: options.static_summary.map(|s| {
1068-
s.as_json_payload()
1076+
payload_converter
1077+
.to_payload(&context, &s)
10691078
.expect("String-to-JSON payload serialization is infallible")
10701079
}),
10711080
details: options.static_details.map(|s| {
1072-
s.as_json_payload()
1081+
payload_converter
1082+
.to_payload(&context, &s)
10731083
.expect("String-to-JSON payload serialization is infallible")
10741084
}),
10751085
})

crates/client/src/workflow_handle.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ use std::{fmt::Debug, marker::PhantomData};
1212
pub use temporalio_common::UntypedWorkflow;
1313
use temporalio_common::{
1414
HasWorkflowDefinition, QueryDefinition, SignalDefinition, UpdateDefinition, WorkflowDefinition,
15-
data_converters::{DataConverter, PayloadConversionError, RawValue, SerializationContextData},
15+
data_converters::{
16+
DataConverter, GenericPayloadConverter, PayloadConversionError, PayloadConverter, RawValue,
17+
SerializationContext, SerializationContextData,
18+
},
1619
payload_visitor::decode_payloads,
1720
protos::{
1821
coresdk::FromPayloadsExt,
@@ -48,21 +51,25 @@ struct DecodedUserMetadata {
4851
details: Option<String>,
4952
}
5053

51-
async fn decode_user_metadata(
52-
data_converter: &DataConverter,
54+
fn decode_user_metadata(
5355
context: &SerializationContextData,
5456
user_metadata: Option<UserMetadata>,
5557
) -> Result<DecodedUserMetadata, PayloadConversionError> {
58+
let payload_converter = PayloadConverter::default();
59+
let context = SerializationContext {
60+
data: context,
61+
converter: &payload_converter,
62+
};
5663
let (summary, details) = user_metadata
5764
.map(|metadata| (metadata.summary, metadata.details))
5865
.unwrap_or_default();
5966
Ok(DecodedUserMetadata {
6067
summary: match summary {
61-
Some(payload) => Some(data_converter.from_payload(context, payload).await?),
68+
Some(payload) => Some(payload_converter.from_payload(&context, payload)?),
6269
None => None,
6370
},
6471
details: match details {
65-
Some(payload) => Some(data_converter.from_payload(context, payload).await?),
72+
Some(payload) => Some(payload_converter.from_payload(&context, payload)?),
6673
None => None,
6774
},
6875
})
@@ -119,12 +126,8 @@ impl WorkflowExecutionDescription {
119126
&SerializationContextData::Workflow,
120127
)
121128
.await;
122-
let decoded_metadata = decode_user_metadata(
123-
data_converter,
124-
&SerializationContextData::Workflow,
125-
raw_user_metadata,
126-
)
127-
.await?;
129+
let decoded_metadata =
130+
decode_user_metadata(&SerializationContextData::Workflow, raw_user_metadata)?;
128131
let history_length_raw = raw_description
129132
.workflow_execution_info
130133
.as_ref()

crates/common/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ opentelemetry-otlp = { version = "0.31", default-features = false, features = [
6868
"grpc-tonic",
6969
"reqwest-rustls",
7070
], optional = true }
71-
parking_lot = { version = "0.12", features = ["send_guard"] }
71+
parking_lot = { version = "0.12" }
7272
prometheus = { version = "0.14", optional = true, default-features = false }
7373
prost = { workspace = true }
7474
prost-wkt = "0.7"

crates/sdk-core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ opentelemetry-otlp = { version = "0.31", default-features = false, features = [
6464
"grpc-tonic",
6565
"reqwest-rustls",
6666
], optional = true }
67-
parking_lot = { version = "0.12", features = ["send_guard"] }
67+
parking_lot = { version = "0.12" }
6868
pid = "4.0"
6969
pin-project = "1.1"
7070
prost = { workspace = true }

crates/sdk-core/tests/integ_tests/client_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,7 @@ async fn http_proxy() {
375375
opts.set_skip_get_system_info(true);
376376

377377
// Connect client with no proxy and make call and confirm reached
378-
opts.target = format!("http://127.0.0.1:{}", server.addr.port())
378+
opts.target = format!("http://[::1]:{}", server.addr.port())
379379
.parse()
380380
.unwrap();
381381
let connection = Connection::connect(opts.clone()).await.unwrap();

crates/sdk-core/tests/integ_tests/data_converter_tests.rs

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -284,19 +284,30 @@ async fn multi_args_serializes_as_multiple_payloads() {
284284
/// A codec that XORs payload data with a key and tracks encode/decode operations.
285285
struct XorCodec {
286286
key: u8,
287+
gate_on_metadata: bool,
287288
encode_count: AtomicUsize,
288289
decode_count: AtomicUsize,
289290
}
290291

291292
impl XorCodec {
292293
fn new(key: u8) -> Self {
293294
Self {
295+
gate_on_metadata: true,
294296
key,
295297
encode_count: AtomicUsize::new(0),
296298
decode_count: AtomicUsize::new(0),
297299
}
298300
}
299301

302+
fn new_with_metadata_gate(key: u8, gate_on_metadata: bool) -> Self {
303+
Self {
304+
key,
305+
gate_on_metadata,
306+
encode_count: AtomicUsize::new(0),
307+
decode_count: AtomicUsize::new(0),
308+
}
309+
}
310+
300311
fn encode_count(&self) -> usize {
301312
self.encode_count.load(Ordering::SeqCst)
302313
}
@@ -316,12 +327,15 @@ impl PayloadCodec for XorCodec {
316327
eprintln!("XorCodec::encode called with {} payloads", count);
317328
self.encode_count.fetch_add(count, Ordering::SeqCst);
318329
let key = self.key;
330+
let gate_on_metadata = self.gate_on_metadata;
319331
async move {
320332
payloads
321333
.into_iter()
322334
.map(|mut p| {
323335
p.data = p.data.iter().map(|b| b ^ key).collect();
324-
p.metadata.insert("xor_encoded".to_string(), vec![key]);
336+
if gate_on_metadata {
337+
p.metadata.insert("xor_encoded".to_string(), vec![key]);
338+
}
325339
p
326340
})
327341
.collect()
@@ -338,11 +352,12 @@ impl PayloadCodec for XorCodec {
338352
eprintln!("XorCodec::decode called with {} payloads", count);
339353
self.decode_count.fetch_add(count, Ordering::SeqCst);
340354
let key = self.key;
355+
let gate_on_metadata = self.gate_on_metadata;
341356
async move {
342357
payloads
343358
.into_iter()
344359
.map(|mut p| {
345-
if p.metadata.remove("xor_encoded").is_some() {
360+
if !gate_on_metadata || p.metadata.remove("xor_encoded").is_some() {
346361
p.data = p.data.iter().map(|b| b ^ key).collect();
347362
}
348363
p
@@ -464,6 +479,77 @@ async fn describe_decodes_workflow_payload_fields() {
464479
desc.memo().unwrap().fields["tracked"],
465480
"codec-describe".as_json_payload().unwrap()
466481
);
482+
let raw_user_metadata = desc
483+
.raw_description
484+
.execution_config
485+
.as_ref()
486+
.and_then(|cfg| cfg.user_metadata.as_ref())
487+
.expect("describe response should include user metadata");
488+
assert_eq!(
489+
raw_user_metadata.summary,
490+
Some("codec summary".as_json_payload().unwrap())
491+
);
492+
assert_eq!(
493+
raw_user_metadata.details,
494+
Some("codec details".as_json_payload().unwrap())
495+
);
496+
assert_eq!(desc.static_summary(), Some("codec summary"));
497+
assert_eq!(desc.static_details(), Some("codec details"));
498+
}
499+
500+
#[tokio::test]
501+
async fn describe_decodes_user_metadata_with_ungated_xor_codec() {
502+
let wf_name = DescribeDataConverterWorkflow::name();
503+
let codec = Arc::new(XorCodec::new_with_metadata_gate(0x42, false));
504+
505+
let connection = get_integ_connection(None).await;
506+
let data_converter = DataConverter::new(
507+
PayloadConverter::default(),
508+
DefaultFailureConverter,
509+
codec.clone(),
510+
);
511+
let client_opts = ClientOptions::new(integ_namespace())
512+
.data_converter(data_converter)
513+
.build();
514+
let client = Client::new(connection, client_opts).unwrap();
515+
516+
let mut starter = CoreWfStarter::new_with_overrides(wf_name, None, Some(client));
517+
starter.sdk_config.register_activities(TestActivities);
518+
starter.sdk_config.task_types = WorkerTaskTypes::all();
519+
starter
520+
.sdk_config
521+
.register_workflow::<DescribeDataConverterWorkflow>();
522+
let wf_id = starter.get_task_queue().to_owned();
523+
let mut worker = starter.worker().await;
524+
525+
let handle = worker
526+
.submit_workflow(
527+
DescribeDataConverterWorkflow::run,
528+
TrackedWrapper(TrackedValue::new("codec-describe".to_string())),
529+
WorkflowStartOptions::new(starter.get_task_queue(), wf_id)
530+
.static_summary("codec summary")
531+
.static_details("codec details")
532+
.build(),
533+
)
534+
.await
535+
.unwrap();
536+
worker.run_until_done().await.unwrap();
537+
538+
let decode_count_before = codec.decode_count();
539+
let desc = handle
540+
.describe(WorkflowDescribeOptions::default())
541+
.await
542+
.unwrap();
543+
544+
assert!(
545+
codec.decode_count() > decode_count_before,
546+
"Describe should have decoded response payloads"
547+
);
548+
assert_eq!(
549+
desc.memo().unwrap().fields["tracked"],
550+
"codec-describe".as_json_payload().unwrap()
551+
);
552+
// Making sure codec isn't used when decoding user metadata
467553
assert_eq!(desc.static_summary(), Some("codec summary"));
468554
assert_eq!(desc.static_details(), Some("codec details"));
469555
}

crates/sdk-core/tests/integ_tests/workflow_tests.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use temporalio_common::{
4444
protos::{
4545
DEFAULT_WORKFLOW_TYPE, canned_histories,
4646
coresdk::{
47-
ActivityTaskCompletion, IntoCompletion,
47+
ActivityTaskCompletion, AsJsonPayloadExt, IntoCompletion,
4848
activity_result::ActivityExecutionResult,
4949
workflow_activation::{WorkflowActivationJob, workflow_activation_job},
5050
workflow_commands::{
@@ -1061,7 +1061,7 @@ async fn pass_timer_summary_to_metadata() {
10611061
let mut mock_cfg = MockPollCfg::from_hist_builder(t);
10621062
let wf_id = mock_cfg.hists[0].wf_id.clone();
10631063
let expected_user_metadata = Some(UserMetadata {
1064-
summary: Some(b"timer summary".into()),
1064+
summary: Some("timer summary".as_json_payload().unwrap()),
10651065
details: None,
10661066
});
10671067
mock_cfg.completion_asserts_from_expectations(|mut asserts| {

crates/sdk/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ futures-util = { version = "0.3", default-features = false, features = [
2121
"async-await-macro",
2222
] }
2323
gethostname = "1.0.2"
24-
parking_lot = { version = "0.12", features = ["send_guard"] }
24+
parking_lot = { version = "0.12" }
2525
prost-types = { workspace = true }
2626
serde = "1.0"
2727
thiserror = "2"

crates/sdk/src/workflow_context.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,11 @@ impl BaseWorkflowContext {
421421
let seq = self.inner.seq_nums.borrow_mut().next_timer_seq();
422422
let (cmd, unblocker) =
423423
CancellableWFCommandFut::new(CancellableID::Timer(seq), self.clone());
424+
let payload_converter = PayloadConverter::default();
425+
let context = SerializationContext {
426+
data: &SerializationContextData::Workflow,
427+
converter: &payload_converter,
428+
};
424429
self.send(
425430
CommandCreateRequest {
426431
cmd: WorkflowCommand {
@@ -436,7 +441,11 @@ impl BaseWorkflowContext {
436441
.into(),
437442
),
438443
user_metadata: Some(UserMetadata {
439-
summary: opts.summary.map(|x| x.as_bytes().into()),
444+
summary: opts.summary.map(|summary| {
445+
payload_converter
446+
.to_payload(&context, &summary)
447+
.expect("String-to-JSON payload serialization is infallible")
448+
}),
440449
details: None,
441450
}),
442451
},

0 commit comments

Comments
 (0)