diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 2f7464f42..41bc58eaf 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -66,6 +66,7 @@ opentelemetry-otlp = { version = "0.31", default-features = false, features = [ "tls-roots", "http-proto", "grpc-tonic", + "reqwest-blocking-client", "reqwest-rustls", ], optional = true } parking_lot = { version = "0.12" } diff --git a/crates/sdk-core/Cargo.toml b/crates/sdk-core/Cargo.toml index 1d193cba8..ab7ded095 100644 --- a/crates/sdk-core/Cargo.toml +++ b/crates/sdk-core/Cargo.toml @@ -62,6 +62,7 @@ opentelemetry-otlp = { version = "0.31", default-features = false, features = [ "tls-roots", "http-proto", "grpc-tonic", + "reqwest-blocking-client", "reqwest-rustls", ], optional = true } parking_lot = { version = "0.12" } diff --git a/crates/sdk-core/tests/common/workflows.rs b/crates/sdk-core/tests/common/workflows.rs index aa2f88d4f..538fd6908 100644 --- a/crates/sdk-core/tests/common/workflows.rs +++ b/crates/sdk-core/tests/common/workflows.rs @@ -32,10 +32,7 @@ impl LaProblemWorkflow { ctx.start_activity( StdActivities::delay, Duration::from_secs(15), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(20)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(20)), ) .await .map_err(|e| anyhow::anyhow!("{e}"))?; diff --git a/crates/sdk-core/tests/heavy_tests.rs b/crates/sdk-core/tests/heavy_tests.rs index 3eeefebfa..b13981418 100644 --- a/crates/sdk-core/tests/heavy_tests.rs +++ b/crates/sdk-core/tests/heavy_tests.rs @@ -39,7 +39,7 @@ use temporalio_common::{ worker::WorkerTaskTypes, }; use temporalio_sdk::{ - ActivityOptions, SyncWorkflowContext, WorkflowContext, WorkflowResult, + ActivityCloseTimeouts, ActivityOptions, SyncWorkflowContext, WorkflowContext, WorkflowResult, activities::{ActivityContext, ActivityError}, workflows, }; @@ -60,16 +60,16 @@ impl ActivityLoadWf { .start_activity( StdActivities::echo, input_str.clone(), - ActivityOptions { - activity_id: Some("act-1".to_string()), - task_queue: Some(tq), - schedule_to_start_timeout: Some(Duration::from_secs(8)), - start_to_close_timeout: Some(Duration::from_secs(8)), - schedule_to_close_timeout: Some(Duration::from_secs(8)), - heartbeat_timeout: Some(Duration::from_secs(8)), - cancellation_type: ActivityCancellationType::TryCancel, - ..Default::default() - }, + ActivityOptions::with_close_timeouts(ActivityCloseTimeouts::Both { + start_to_close: Duration::from_secs(8), + schedule_to_close: Duration::from_secs(8), + }) + .activity_id("act-1".to_string()) + .task_queue(tq) + .schedule_to_start_timeout(Duration::from_secs(8)) + .heartbeat_timeout(Duration::from_secs(8)) + .cancellation_type(ActivityCancellationType::TryCancel) + .build(), ) .await?; assert_eq!(res, input_str); @@ -129,11 +129,9 @@ impl ChunkyActivityWf { .start_activity( ChunkyActivities::chunky_echo, input_str.clone(), - ActivityOptions { - activity_id: Some("act-1".to_string()), - start_to_close_timeout: Some(Duration::from_secs(30)), - ..Default::default() - }, + ActivityOptions::with_start_to_close_timeout(Duration::from_secs(30)) + .activity_id("act-1".to_string()) + .build(), ) .await?; assert_eq!(res, input_str); @@ -222,10 +220,7 @@ impl WorkflowLoadWf { .start_activity( StdActivities::echo, "hi!".to_string(), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(5)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(5)), ) .await; ctx.timer(Duration::from_secs(1)).await; @@ -439,10 +434,7 @@ impl PollerLoadWf { .start_activity( JitteryActivities::jittery_echo, "hi!".to_string(), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(5)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(5)), ) .await; } diff --git a/crates/sdk-core/tests/heavy_tests/fuzzy_workflow.rs b/crates/sdk-core/tests/heavy_tests/fuzzy_workflow.rs index 2e2e84175..496808087 100644 --- a/crates/sdk-core/tests/heavy_tests/fuzzy_workflow.rs +++ b/crates/sdk-core/tests/heavy_tests/fuzzy_workflow.rs @@ -54,10 +54,7 @@ impl FuzzyWf { .start_activity( StdActivities::echo, "hi!".to_string(), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(5)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(5)), ) .await; } diff --git a/crates/sdk-core/tests/integ_tests/async_activity_client_tests.rs b/crates/sdk-core/tests/integ_tests/async_activity_client_tests.rs index 4548d48f3..6f95ba75b 100644 --- a/crates/sdk-core/tests/integ_tests/async_activity_client_tests.rs +++ b/crates/sdk-core/tests/integ_tests/async_activity_client_tests.rs @@ -110,15 +110,13 @@ async fn async_activity_completions( let activity_future = ctx.start_activity( AsyncActivities::complete_async_activity, expected_outcome, - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(30)), - retry_policy: Some(RetryPolicy { + ActivityOptions::with_start_to_close_timeout(Duration::from_secs(30)) + .retry_policy(RetryPolicy { maximum_attempts: 1, ..Default::default() - }), - cancellation_type: ActivityCancellationType::WaitCancellationCompleted, - ..Default::default() - }, + }) + .cancellation_type(ActivityCancellationType::WaitCancellationCompleted) + .build(), ); // For cancellation, wait a bit to let the activity start, then request cancel diff --git a/crates/sdk-core/tests/integ_tests/data_converter_tests.rs b/crates/sdk-core/tests/integ_tests/data_converter_tests.rs index fc3f770b4..7145004d5 100644 --- a/crates/sdk-core/tests/integ_tests/data_converter_tests.rs +++ b/crates/sdk-core/tests/integ_tests/data_converter_tests.rs @@ -105,10 +105,7 @@ impl DataConverterTestWorkflow { .start_activity( TestActivities::process_tracked, input, - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(5)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(5)), ) .await .map_err(|e| anyhow::anyhow!("{e}"))?; @@ -132,10 +129,7 @@ impl DescribeDataConverterWorkflow { .start_activity( TestActivities::process_tracked, input, - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(5)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(5)), ) .await .map_err(|e| anyhow::anyhow!("{e}"))?; diff --git a/crates/sdk-core/tests/integ_tests/heartbeat_tests.rs b/crates/sdk-core/tests/integ_tests/heartbeat_tests.rs index 80551b3ec..f9050f72c 100644 --- a/crates/sdk-core/tests/integ_tests/heartbeat_tests.rs +++ b/crates/sdk-core/tests/integ_tests/heartbeat_tests.rs @@ -42,15 +42,13 @@ impl ActivityDoesntHeartbeatHitsTimeoutThenCompletesWf { .start_activity( StdActivities::delay, Duration::from_secs(4), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(10)), - heartbeat_timeout: Some(Duration::from_secs(2)), - retry_policy: Some(RetryPolicy { + ActivityOptions::with_start_to_close_timeout(Duration::from_secs(10)) + .heartbeat_timeout(Duration::from_secs(2)) + .retry_policy(RetryPolicy { maximum_attempts: 1, ..Default::default() - }), - ..Default::default() - }, + }) + .build(), ) .await; let err = res.unwrap_err(); diff --git a/crates/sdk-core/tests/integ_tests/metrics_tests.rs b/crates/sdk-core/tests/integ_tests/metrics_tests.rs index 139bff508..4e606d5e6 100644 --- a/crates/sdk-core/tests/integ_tests/metrics_tests.rs +++ b/crates/sdk-core/tests/integ_tests/metrics_tests.rs @@ -857,22 +857,17 @@ async fn activity_metrics() { let normal_act_pass = ctx.start_activity( PassFailActivities::pass_fail_act, "pass".to_string(), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(1)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(1)), ); let normal_act_fail = ctx.start_activity( PassFailActivities::pass_fail_act, "fail".to_string(), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(1)), - retry_policy: Some(RetryPolicy { + ActivityOptions::with_start_to_close_timeout(Duration::from_secs(1)) + .retry_policy(RetryPolicy { maximum_attempts: 1, ..Default::default() - }), - ..Default::default() - }, + }) + .build(), ); let _ = join!(normal_act_pass, normal_act_fail); let local_act_pass = ctx.start_local_activity( diff --git a/crates/sdk-core/tests/integ_tests/polling_tests.rs b/crates/sdk-core/tests/integ_tests/polling_tests.rs index 19206c1bb..ee78fafbc 100644 --- a/crates/sdk-core/tests/integ_tests/polling_tests.rs +++ b/crates/sdk-core/tests/integ_tests/polling_tests.rs @@ -265,10 +265,7 @@ impl OnlyOneWorkflowSlotAndTwoPollers { .start_activity( StdActivities::echo, "hi!".to_string(), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(5)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(5)), ) .await; } diff --git a/crates/sdk-core/tests/integ_tests/update_tests.rs b/crates/sdk-core/tests/integ_tests/update_tests.rs index 406f42508..34140f204 100644 --- a/crates/sdk-core/tests/integ_tests/update_tests.rs +++ b/crates/sdk-core/tests/integ_tests/update_tests.rs @@ -1119,10 +1119,7 @@ async fn worker_restarted_in_middle_of_update() { ctx.start_activity( BlockingActivities::blocks, "hi!".to_string(), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(2)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(2)), ) .await?; Ok(()) @@ -1223,10 +1220,7 @@ async fn update_after_empty_wft() { .start_activity( StdActivities::echo, "hi!".to_string(), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(2)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(2)), ) .await; ACT_STARTED.store(false, Ordering::Release); @@ -1251,10 +1245,7 @@ async fn update_after_empty_wft() { .start_activity( StdActivities::echo, "hi!".to_string(), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(2)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(2)), ) .await; } @@ -1324,10 +1315,7 @@ async fn update_lost_on_activity_mismatch() { .start_activity( StdActivities::echo, "hi!".to_string(), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(2)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(2)), ) .await; ctx.state_mut(|s| s.can_run -= 1); diff --git a/crates/sdk-core/tests/integ_tests/worker_heartbeat_tests.rs b/crates/sdk-core/tests/integ_tests/worker_heartbeat_tests.rs index 0831f7a3c..4a7d1cc3e 100644 --- a/crates/sdk-core/tests/integ_tests/worker_heartbeat_tests.rs +++ b/crates/sdk-core/tests/integ_tests/worker_heartbeat_tests.rs @@ -203,10 +203,7 @@ async fn docker_worker_heartbeat_basic(#[values("otel", "prom", "no_metrics")] b .start_activity( NotifyActivities::pass_fail_act, "pass".to_string(), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(5)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(5)), ) .await; Ok(()) @@ -356,10 +353,7 @@ async fn docker_worker_heartbeat_tuner() { .start_activity( StdActivities::echo, "pass".to_string(), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(1)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(1)), ) .await; Ok(()) @@ -661,10 +655,7 @@ async fn worker_heartbeat_sticky_cache_miss() { .start_activity( StickyCacheActivities::sticky_cache_history_act, wf_marker.clone(), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(5)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(5)), ) .await; @@ -905,16 +896,14 @@ async fn worker_heartbeat_failure_metrics() { .start_activity( FailingActivities::failing_act, "boom".to_string(), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(5)), - retry_policy: Some(RetryPolicy { + ActivityOptions::with_start_to_close_timeout(Duration::from_secs(5)) + .retry_policy(RetryPolicy { initial_interval: Some(prost_dur!(from_millis(10))), backoff_coefficient: 1.0, maximum_attempts: 4, ..Default::default() - }), - ..Default::default() - }, + }) + .build(), ) .await; @@ -1048,10 +1037,7 @@ async fn worker_heartbeat_no_runtime_heartbeat() { .start_activity( StdActivities::echo, "pass".to_string(), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(1)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(1)), ) .await; Ok(()) @@ -1121,10 +1107,7 @@ async fn worker_heartbeat_skip_client_worker_set_check() { .start_activity( StdActivities::echo, "pass".to_string(), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(1)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(1)), ) .await; Ok(()) diff --git a/crates/sdk-core/tests/integ_tests/worker_tests.rs b/crates/sdk-core/tests/integ_tests/worker_tests.rs index 60def06ed..215d56490 100644 --- a/crates/sdk-core/tests/integ_tests/worker_tests.rs +++ b/crates/sdk-core/tests/integ_tests/worker_tests.rs @@ -414,12 +414,20 @@ async fn activity_tasks_from_completion_reserve_slots() { impl ActivityTasksCompletionWf { #[run(name = DEFAULT_WORKFLOW_TYPE)] async fn run(ctx: &mut WorkflowContext) -> WorkflowResult<()> { - ctx.start_activity(FakeAct::act1, (), ActivityOptions::default()) - .await - .map_err(|e| WorkflowTermination::from(anyhow::Error::from(e)))?; - ctx.start_activity(FakeAct::act2, (), ActivityOptions::default()) - .await - .map_err(|e| WorkflowTermination::from(anyhow::Error::from(e)))?; + ctx.start_activity( + FakeAct::act1, + (), + ActivityOptions::start_to_close_timeout(Duration::from_secs(5)), + ) + .await + .map_err(|e| WorkflowTermination::from(anyhow::Error::from(e)))?; + ctx.start_activity( + FakeAct::act2, + (), + ActivityOptions::start_to_close_timeout(Duration::from_secs(5)), + ) + .await + .map_err(|e| WorkflowTermination::from(anyhow::Error::from(e)))?; ctx.state(|wf| wf.complete_token.cancel()); Ok(()) } @@ -782,10 +790,7 @@ async fn test_custom_slot_supplier_simple() { .start_activity( StdActivities::no_op, (), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(10)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(10)), ) .await; let _result = ctx diff --git a/crates/sdk-core/tests/integ_tests/worker_versioning_tests.rs b/crates/sdk-core/tests/integ_tests/worker_versioning_tests.rs index c9cb53cfa..9a6519e31 100644 --- a/crates/sdk-core/tests/integ_tests/worker_versioning_tests.rs +++ b/crates/sdk-core/tests/integ_tests/worker_versioning_tests.rs @@ -300,10 +300,7 @@ impl ActivityHasDeploymentStampWf { .start_activity( StdActivities::echo, "hi!".to_string(), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(5)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(5)), ) .await; Ok(()) diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests.rs b/crates/sdk-core/tests/integ_tests/workflow_tests.rs index 70e7c368f..dad1327bd 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests.rs @@ -504,10 +504,7 @@ impl SlowCompletesWf { ctx.start_activity( StdActivities::echo, "hi!".to_string(), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(5)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(5)), ) .await .map_err(|e| anyhow::anyhow!("{e}"))?; @@ -883,10 +880,7 @@ async fn nondeterminism_errors_fail_workflow_when_configured_to( ctx.start_activity( StdActivities::echo, "hi".to_owned(), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(5)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(5)), ) .await .map_err(|e| anyhow::anyhow!("{e}"))?; @@ -968,10 +962,7 @@ async fn history_out_of_order_on_restart() { ctx.start_activity( StdActivities::echo, "hi".to_string(), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(5)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(5)), ) .await .map_err(|e| anyhow::anyhow!("{e}"))?; @@ -1004,10 +995,7 @@ async fn history_out_of_order_on_restart() { ctx.start_activity( StdActivities::echo, "hi".to_string(), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(5)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(5)), ) .await .map_err(|e| anyhow::anyhow!("{e}"))?; diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/activities.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/activities.rs index 45ac5222c..af71a1d1c 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/activities.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/activities.rs @@ -70,10 +70,7 @@ impl OneActivityWorkflow { .start_activity( StdActivities::echo, input, - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(5)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(5)), ) .await .map_err(|e| anyhow!("{e}"))?; @@ -93,10 +90,7 @@ impl MultiArgActivityWorkflow { .start_activity( StdActivities::concat, (input, " world".to_string()), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(5)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(5)), ) .await .map_err(|e| anyhow!("{e}"))?; @@ -963,11 +957,9 @@ impl OneActivityAbandonCancelledBeforeStarted { let act_fut = ctx.start_activity( StdActivities::delay, Duration::from_secs(2), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(5)), - cancellation_type: ActivityCancellationType::Abandon, - ..Default::default() - }, + ActivityOptions::with_start_to_close_timeout(Duration::from_secs(5)) + .cancellation_type(ActivityCancellationType::Abandon) + .build(), ); act_fut.cancel(); let _ = act_fut.await; @@ -1009,11 +1001,9 @@ impl OneActivityAbandonCancelledAfterComplete { let act_fut = ctx.start_activity( StdActivities::delay, Duration::from_secs(2), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(5)), - cancellation_type: ActivityCancellationType::Abandon, - ..Default::default() - }, + ActivityOptions::with_start_to_close_timeout(Duration::from_secs(5)) + .cancellation_type(ActivityCancellationType::Abandon) + .build(), ); ctx.timer(Duration::from_secs(1)).await; act_fut.cancel(); @@ -1094,15 +1084,13 @@ async fn graceful_shutdown() { ctx.start_activity( SleeperActivities::sleeper, "hi".to_string(), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(5)), - retry_policy: Some(RetryPolicy { + ActivityOptions::with_start_to_close_timeout(Duration::from_secs(5)) + .retry_policy(RetryPolicy { maximum_attempts: 1, ..Default::default() - }), - cancellation_type: ActivityCancellationType::WaitCancellationCompleted, - ..Default::default() - }, + }) + .cancellation_type(ActivityCancellationType::WaitCancellationCompleted) + .build(), ) }); temporalio_sdk::workflows::join_all(act_futs).await; @@ -1188,14 +1176,12 @@ async fn activity_can_be_cancelled_by_local_timeout() { .start_activity( CancellableEchoActivities::cancellable_echo, "hi!".to_string(), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(1)), - retry_policy: Some(RetryPolicy { + ActivityOptions::with_start_to_close_timeout(Duration::from_secs(1)) + .retry_policy(RetryPolicy { maximum_attempts: 1, ..Default::default() - }), - ..Default::default() - }, + }) + .build(), ) .await; assert!(res.is_err_and(|e| e.is_timeout())); @@ -1254,14 +1240,12 @@ async fn long_activity_timeout_repro() { .start_activity( StdActivities::echo, "hi!".to_string(), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(1)), - retry_policy: Some(RetryPolicy { + ActivityOptions::with_start_to_close_timeout(Duration::from_secs(1)) + .retry_policy(RetryPolicy { maximum_attempts: 1, ..Default::default() - }), - ..Default::default() - }, + }) + .build(), ) .await; assert!(res.is_ok()); @@ -1322,10 +1306,9 @@ async fn pass_activity_summary_to_metadata() { ctx.start_activity( StdActivities::default, (), - ActivityOptions { - summary: Some("activity summary".to_string()), - ..Default::default() - }, + ActivityOptions::with_start_to_close_timeout(Duration::from_secs(5)) + .summary("activity summary".to_string()) + .build(), ) .await .map_err(|e| anyhow!("{e}"))?; @@ -1385,11 +1368,9 @@ async fn abandoned_activities_ignore_start_and_complete(hist_batches: &'static [ let act_fut = ctx.start_activity( StdActivities::default, (), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(5)), - cancellation_type: ActivityCancellationType::Abandon, - ..Default::default() - }, + ActivityOptions::with_start_to_close_timeout(Duration::from_secs(5)) + .cancellation_type(ActivityCancellationType::Abandon) + .build(), ); ctx.timer(Duration::from_secs(1)).await; act_fut.cancel(); @@ -1420,8 +1401,11 @@ struct ImmediateActivityCancelationWorkflow; impl ImmediateActivityCancelationWorkflow { #[run(name = DEFAULT_WORKFLOW_TYPE)] async fn run(ctx: &mut WorkflowContext) -> WorkflowResult<()> { - let cancel_activity_future = - ctx.start_activity(StdActivities::default, (), ActivityOptions::default()); + let cancel_activity_future = ctx.start_activity( + StdActivities::default, + (), + ActivityOptions::start_to_close_timeout(Duration::from_secs(5)), + ); cancel_activity_future.cancel(); let _ = cancel_activity_future.await; Ok(()) diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/determinism.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/determinism.rs index 29899db2d..c3e64bb61 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/determinism.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/determinism.rs @@ -53,9 +53,13 @@ impl TimerWfNondeterministic { } } 2 => { - ctx.start_activity(StdActivities::default, (), ActivityOptions::default()) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; + ctx.start_activity( + StdActivities::default, + (), + ActivityOptions::start_to_close_timeout(Duration::from_secs(5)), + ) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; } _ => panic!("Ran too many times"), } @@ -104,10 +108,7 @@ impl TaskFailReplayWf { .start_activity( StdActivities::echo, "hi!".to_string(), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(2)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(2)), ) .await; if !ctx.state(|wf| wf.did_fail.load(Ordering::Relaxed)) { @@ -301,17 +302,20 @@ impl ActivityIdOrTypeChangeWf { ctx.start_activity( StdActivities::default, (), - ActivityOptions { - activity_id: Some("I'm bad and wrong!".to_string()), - ..Default::default() - }, + ActivityOptions::with_start_to_close_timeout(Duration::from_secs(5)) + .activity_id("I'm bad and wrong!".to_string()) + .build(), ) .await .map_err(|e| anyhow::anyhow!("{e}"))?; } else { - ctx.start_activity(StdActivities::no_op, (), ActivityOptions::default()) - .await - .map_err(|e| anyhow::anyhow!("{e}"))?; + ctx.start_activity( + StdActivities::no_op, + (), + ActivityOptions::start_to_close_timeout(Duration::from_secs(5)), + ) + .await + .map_err(|e| anyhow::anyhow!("{e}"))?; } Ok(()) } diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/local_activities.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/local_activities.rs index 2dff951a0..9242da1fe 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/local_activities.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/local_activities.rs @@ -1024,11 +1024,9 @@ async fn la_resolve_same_time_as_other_cancel() { let mut normal_act = ctx.start_activity( DelayWithCancellation::delay, Duration::from_secs(9), - ActivityOptions { - cancellation_type: ActivityCancellationType::TryCancel, - start_to_close_timeout: Some(Duration::from_secs(9000)), - ..Default::default() - }, + ActivityOptions::with_start_to_close_timeout(Duration::from_secs(9000)) + .cancellation_type(ActivityCancellationType::TryCancel) + .build(), ); // Make new task ctx.timer(Duration::from_millis(1)).await; diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/patches.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/patches.rs index 283e4d10c..bab2d1105 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/patches.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/patches.rs @@ -410,10 +410,9 @@ async fn v1(ctx: &mut WorkflowContext) { .start_activity( FakeAct::nameless, (), - ActivityOptions { - activity_id: Some("no_change".to_owned()), - ..Default::default() - }, + ActivityOptions::with_start_to_close_timeout(Duration::from_secs(5)) + .activity_id("no_change".to_owned()) + .build(), ) .await; } @@ -424,10 +423,9 @@ async fn v2(ctx: &mut WorkflowContext) -> bool { .start_activity( FakeAct::nameless, (), - ActivityOptions { - activity_id: Some("had_change".to_owned()), - ..Default::default() - }, + ActivityOptions::with_start_to_close_timeout(Duration::from_secs(5)) + .activity_id("had_change".to_owned()) + .build(), ) .await; true @@ -436,10 +434,9 @@ async fn v2(ctx: &mut WorkflowContext) -> bool { .start_activity( FakeAct::nameless, (), - ActivityOptions { - activity_id: Some("no_change".to_owned()), - ..Default::default() - }, + ActivityOptions::with_start_to_close_timeout(Duration::from_secs(5)) + .activity_id("no_change".to_owned()) + .build(), ) .await; false @@ -452,10 +449,9 @@ async fn v3(ctx: &mut WorkflowContext) { .start_activity( FakeAct::nameless, (), - ActivityOptions { - activity_id: Some("had_change".to_owned()), - ..Default::default() - }, + ActivityOptions::with_start_to_close_timeout(Duration::from_secs(5)) + .activity_id("had_change".to_owned()) + .build(), ) .await; } @@ -465,10 +461,9 @@ async fn v4(ctx: &mut WorkflowContext) { .start_activity( FakeAct::nameless, (), - ActivityOptions { - activity_id: Some("had_change".to_owned()), - ..Default::default() - }, + ActivityOptions::with_start_to_close_timeout(Duration::from_secs(5)) + .activity_id("had_change".to_owned()) + .build(), ) .await; } @@ -682,7 +677,11 @@ impl SameChangeMultipleSpotsWf { async fn run(ctx: &mut WorkflowContext) -> WorkflowResult<()> { if ctx.patched(MY_PATCH_ID) { let _ = ctx - .start_activity(FakeAct::nameless, (), ActivityOptions::default()) + .start_activity( + FakeAct::nameless, + (), + ActivityOptions::start_to_close_timeout(Duration::from_secs(5)), + ) .await; } else { ctx.timer(ONE_SECOND).await; @@ -690,7 +689,11 @@ impl SameChangeMultipleSpotsWf { ctx.timer(ONE_SECOND).await; if ctx.patched(MY_PATCH_ID) { let _ = ctx - .start_activity(FakeAct::nameless, (), ActivityOptions::default()) + .start_activity( + FakeAct::nameless, + (), + ActivityOptions::start_to_close_timeout(Duration::from_secs(5)), + ) .await; } else { ctx.timer(ONE_SECOND).await; diff --git a/crates/sdk-core/tests/manual_tests.rs b/crates/sdk-core/tests/manual_tests.rs index f2b430349..cee13a154 100644 --- a/crates/sdk-core/tests/manual_tests.rs +++ b/crates/sdk-core/tests/manual_tests.rs @@ -60,10 +60,7 @@ impl PollerLoadSpikyWf { .start_activity( JitteryEchoActivities::echo, "hi!".to_string(), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(5)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(5)), ) .await; } @@ -110,10 +107,7 @@ impl PollerLoadSpikeThenSustainedWf { .start_activity( JitteryEchoActivities::echo, "hi!".to_string(), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(5)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(5)), ) .await; } diff --git a/crates/sdk-core/tests/shared_tests/priority.rs b/crates/sdk-core/tests/shared_tests/priority.rs index dafc0d15f..1be6923be 100644 --- a/crates/sdk-core/tests/shared_tests/priority.rs +++ b/crates/sdk-core/tests/shared_tests/priority.rs @@ -74,16 +74,14 @@ pub(crate) async fn priority_values_sent_to_server() { let activity = ctx.start_activity( PriorityActivities::echo, "hello".to_string(), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(5)), - priority: Some(Priority { + ActivityOptions::with_start_to_close_timeout(Duration::from_secs(5)) + .priority(Priority { priority_key: Some(5), fairness_key: Some("fair-act".to_string()), fairness_weight: Some(1.1), - }), - do_not_eagerly_execute: true, - ..Default::default() - }, + }) + .do_not_eagerly_execute(true) + .build(), ); let _ = started.result().await; let _ = activity.await; diff --git a/crates/sdk/README.md b/crates/sdk/README.md index 68eff9092..2a519fcca 100644 --- a/crates/sdk/README.md +++ b/crates/sdk/README.md @@ -68,10 +68,7 @@ impl GreetingWorkflow { let greeting = ctx.start_activity( MyActivities::greet, name, - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(10)), - ..Default::default() - } + ActivityOptions::start_to_close_timeout(Duration::from_secs(10)) )?.await?; Ok(greeting) diff --git a/crates/sdk/examples/activity_heartbeating/workflows.rs b/crates/sdk/examples/activity_heartbeating/workflows.rs index 883692360..8363b878a 100644 --- a/crates/sdk/examples/activity_heartbeating/workflows.rs +++ b/crates/sdk/examples/activity_heartbeating/workflows.rs @@ -48,11 +48,9 @@ impl HeartbeatingWorkflow { .start_activity( HeartbeatingActivities::long_running_activity, total_steps, - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(30)), - heartbeat_timeout: Some(Duration::from_secs(5)), - ..Default::default() - }, + ActivityOptions::with_start_to_close_timeout(Duration::from_secs(30)) + .heartbeat_timeout(Duration::from_secs(5)) + .build(), ) .await .map_err(|e| anyhow::anyhow!("{e}"))?; diff --git a/crates/sdk/examples/cancellation/workflows.rs b/crates/sdk/examples/cancellation/workflows.rs index acca3565d..774b80d5c 100644 --- a/crates/sdk/examples/cancellation/workflows.rs +++ b/crates/sdk/examples/cancellation/workflows.rs @@ -31,11 +31,9 @@ impl CancellationActivities { } fn activity_opts() -> ActivityOptions { - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(300)), - heartbeat_timeout: Some(Duration::from_secs(5)), - ..Default::default() - } + ActivityOptions::with_start_to_close_timeout(Duration::from_secs(300)) + .heartbeat_timeout(Duration::from_secs(5)) + .build() } #[workflow] @@ -64,10 +62,7 @@ impl CancellationWorkflow { .start_activity( CancellationActivities::cleanup, (), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(10)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(10)), ) .await .map_err(|e| anyhow::anyhow!("{e}"))?; diff --git a/crates/sdk/examples/hello_world/workflows.rs b/crates/sdk/examples/hello_world/workflows.rs index 8435abdb0..ff943a692 100644 --- a/crates/sdk/examples/hello_world/workflows.rs +++ b/crates/sdk/examples/hello_world/workflows.rs @@ -18,10 +18,7 @@ impl HelloWorldWorkflow { .start_activity( GreetingActivities::greet, name, - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(10)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(10)), ) .await .map_err(|e| anyhow::anyhow!("{e}"))?; diff --git a/crates/sdk/examples/local_activities/workflows.rs b/crates/sdk/examples/local_activities/workflows.rs index 38e0613da..57e1dcdfd 100644 --- a/crates/sdk/examples/local_activities/workflows.rs +++ b/crates/sdk/examples/local_activities/workflows.rs @@ -31,10 +31,7 @@ impl LocalActivitiesWorkflow { .start_activity( GreetingActivities::greet, name.clone(), - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(10)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(10)), ) .await .map_err(|e| anyhow::anyhow!("{e}"))?; diff --git a/crates/sdk/examples/polling/workflows.rs b/crates/sdk/examples/polling/workflows.rs index d2da9079a..5a9ae2590 100644 --- a/crates/sdk/examples/polling/workflows.rs +++ b/crates/sdk/examples/polling/workflows.rs @@ -20,10 +20,7 @@ impl PollingActivities { } fn activity_opts() -> ActivityOptions { - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(10)), - ..Default::default() - } + ActivityOptions::start_to_close_timeout(Duration::from_secs(10)) } #[workflow] diff --git a/crates/sdk/examples/saga/workflows.rs b/crates/sdk/examples/saga/workflows.rs index 7cdf8ce66..bd87244d6 100644 --- a/crates/sdk/examples/saga/workflows.rs +++ b/crates/sdk/examples/saga/workflows.rs @@ -168,8 +168,5 @@ impl BookingActivities { } fn activity_opts() -> ActivityOptions { - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(1)), - ..Default::default() - } + ActivityOptions::start_to_close_timeout(Duration::from_secs(1)) } diff --git a/crates/sdk/examples/schedules/workflows.rs b/crates/sdk/examples/schedules/workflows.rs index fba2e29f8..a586d7b41 100644 --- a/crates/sdk/examples/schedules/workflows.rs +++ b/crates/sdk/examples/schedules/workflows.rs @@ -28,10 +28,7 @@ impl ScheduledWorkflow { .start_activity( ScheduledActivities::greet, name, - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(10)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(10)), ) .await .map_err(|e| anyhow::anyhow!("{e}"))?; diff --git a/crates/sdk/examples/timer_examples/workflows.rs b/crates/sdk/examples/timer_examples/workflows.rs index 3f19d347d..0f738361c 100644 --- a/crates/sdk/examples/timer_examples/workflows.rs +++ b/crates/sdk/examples/timer_examples/workflows.rs @@ -35,10 +35,7 @@ impl TimerWorkflow { result = ctx.start_activity( TimerActivities::slow_activity, 100u64, - ActivityOptions { - start_to_close_timeout: Some(Duration::from_secs(30)), - ..Default::default() - }, + ActivityOptions::start_to_close_timeout(Duration::from_secs(30)), ) => { result.map_err(|e| anyhow::anyhow!("{e}"))?; "activity" diff --git a/crates/sdk/src/lib.rs b/crates/sdk/src/lib.rs index 35775582a..6805476cb 100644 --- a/crates/sdk/src/lib.rs +++ b/crates/sdk/src/lib.rs @@ -98,8 +98,8 @@ use workflow_future::WorkflowFunction; pub use temporalio_client::Namespace; pub use workflow_context::{ - ActivityExecutionError, ActivityOptions, BaseWorkflowContext, CancellableFuture, - ChildWorkflowExecutionError, ChildWorkflowOptions, ChildWorkflowSignalError, + ActivityCloseTimeouts, ActivityExecutionError, ActivityOptions, BaseWorkflowContext, + CancellableFuture, ChildWorkflowExecutionError, ChildWorkflowOptions, ChildWorkflowSignalError, ContinueAsNewOptions, ExternalWorkflowHandle, LocalActivityOptions, NexusOperationOptions, ParentWorkflowInfo, RootWorkflowInfo, Signal, SignalData, StartChildWorkflowExecutionFailedCause, StartedChildWorkflow, SyncWorkflowContext, @@ -1375,11 +1375,15 @@ mod tests { #[allow(unused, clippy::diverging_sub_expression)] fn test_activity_via_workflow_context() { let wf_ctx: WorkflowContext = unimplemented!(); - wf_ctx.start_activity(MyActivities::my_activity, (), ActivityOptions::default()); + wf_ctx.start_activity( + MyActivities::my_activity, + (), + ActivityOptions::start_to_close_timeout(Duration::from_secs(5)), + ); wf_ctx.start_activity( MyActivities::takes_self, "Hi".to_owned(), - ActivityOptions::default(), + ActivityOptions::start_to_close_timeout(Duration::from_secs(5)), ); } diff --git a/crates/sdk/src/workflow_context.rs b/crates/sdk/src/workflow_context.rs index 809e4d6ca..57afa51ec 100644 --- a/crates/sdk/src/workflow_context.rs +++ b/crates/sdk/src/workflow_context.rs @@ -1,8 +1,8 @@ mod options; pub use options::{ - ActivityOptions, ChildWorkflowOptions, ContinueAsNewOptions, LocalActivityOptions, - NexusOperationOptions, Signal, SignalData, TimerOptions, + ActivityCloseTimeouts, ActivityOptions, ChildWorkflowOptions, ContinueAsNewOptions, + LocalActivityOptions, NexusOperationOptions, Signal, SignalData, TimerOptions, }; pub use temporalio_common::protos::coresdk::child_workflow::StartChildWorkflowExecutionFailedCause; diff --git a/crates/sdk/src/workflow_context/options.rs b/crates/sdk/src/workflow_context/options.rs index b2a75b927..b04ec71d2 100644 --- a/crates/sdk/src/workflow_context/options.rs +++ b/crates/sdk/src/workflow_context/options.rs @@ -31,8 +31,15 @@ pub(crate) trait IntoWorkflowCommand { } /// Options for scheduling an activity -#[derive(Default, Debug)] +#[derive(Debug, bon::Builder, Clone)] +#[non_exhaustive] +#[builder(start_fn = with_close_timeouts, on(String, into), state_mod(vis = "pub"))] pub struct ActivityOptions { + /// Timeouts for activity completion. + /// + /// See [`ActivityCloseTimeouts`] for the meaning of each timeout variant. + #[builder(start_fn)] + pub close_timeouts: ActivityCloseTimeouts, /// Identifier to use for tracking the activity in Workflow history. /// The `activityId` can be accessed by the activity function. /// Does not need to be unique. @@ -50,23 +57,11 @@ pub struct ActivityOptions { /// Retrying after this timeout doesn't make sense as it would just put the Activity Task back /// into the same Task Queue. pub schedule_to_start_timeout: Option, - /// Maximum time of a single Activity execution attempt. - /// Note that the Temporal Server doesn't detect Worker process failures directly. - /// It relies on this timeout to detect that an Activity that didn't complete on time. - /// So this timeout should be as short as the longest possible execution of the Activity body. - /// Potentially long running Activities must specify `heartbeat_timeout` and heartbeat from the - /// activity periodically for timely failure detection. - /// Either this option or `schedule_to_close_timeout` is required. - pub start_to_close_timeout: Option, - /// Total time that a workflow is willing to wait for Activity to complete. - /// `schedule_to_close_timeout` limits the total time of an Activity's execution including - /// retries (use `start_to_close_timeout` to limit the time of a single attempt). - /// Either this option or `start_to_close_timeout` is required. - pub schedule_to_close_timeout: Option, /// Heartbeat interval. Activity must heartbeat before this interval passes after a last /// heartbeat or activity start. pub heartbeat_timeout: Option, /// Determines what the SDK does when the Activity is cancelled. + #[builder(default)] pub cancellation_type: ActivityCancellationType, /// Activity retry policy pub retry_policy: Option, @@ -75,10 +70,35 @@ pub struct ActivityOptions { /// Priority for the activity pub priority: Option, /// If true, disable eager execution for this activity + #[builder(default)] pub do_not_eagerly_execute: bool, } impl ActivityOptions { + /// Returns a builder with `close_timeout` set to [`ActivityCloseTimeouts::StartToClose`]. + pub fn with_start_to_close_timeout(duration: Duration) -> ActivityOptionsBuilder { + Self::with_close_timeouts(ActivityCloseTimeouts::StartToClose(duration)) + } + + /// Returns a builder with `close_timeout` set to [`ActivityCloseTimeouts::ScheduleToClose`]. + pub fn with_schedule_to_close_timeout(duration: Duration) -> ActivityOptionsBuilder { + Self::with_close_timeouts(ActivityCloseTimeouts::ScheduleToClose(duration)) + } + + /// Creates activity options with only `start_to_close_timeout` set. + /// + /// If you need additional fields set, use [`Self::with_start_to_close_timeout`]. + pub fn start_to_close_timeout(duration: Duration) -> Self { + Self::with_start_to_close_timeout(duration).build() + } + + /// Creates activity options with only `schedule_to_close_timeout` set. + /// + /// If you need additional fields set, use [`Self::with_schedule_to_close_timeout`]. + pub fn schedule_to_close_timeout(duration: Duration) -> Self { + Self::with_schedule_to_close_timeout(duration).build() + } + pub(crate) fn into_command( self, activity_type: String, @@ -90,6 +110,8 @@ impl ActivityOptions { data: &SerializationContextData::Workflow, converter: &payload_converter, }; + let (start_to_close_timeout, schedule_to_close_timeout) = + self.close_timeouts.into_durations(); WorkflowCommand { variant: Some( ScheduleActivity { @@ -100,15 +122,12 @@ impl ActivityOptions { }, activity_type, task_queue: self.task_queue.unwrap_or_default(), - schedule_to_close_timeout: self - .schedule_to_close_timeout + schedule_to_close_timeout: schedule_to_close_timeout .and_then(|d| d.try_into().ok()), schedule_to_start_timeout: self .schedule_to_start_timeout .and_then(|d| d.try_into().ok()), - start_to_close_timeout: self - .start_to_close_timeout - .and_then(|d| d.try_into().ok()), + start_to_close_timeout: start_to_close_timeout.and_then(|d| d.try_into().ok()), heartbeat_timeout: self.heartbeat_timeout.and_then(|d| d.try_into().ok()), cancellation_type: self.cancellation_type as i32, arguments, @@ -134,6 +153,42 @@ impl ActivityOptions { } } +/// The timeouts applied to an activity's completion. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ActivityCloseTimeouts { + /// Total time that a workflow is willing to wait for Activity to complete. + /// `ActivityCloseTimeouts::ScheduleToClose` limits the total time of an Activity's execution including + /// retries (use `ActivityCloseTimeouts::StartToClose` to limit the time of a single attempt). + ScheduleToClose(Duration), + /// Maximum time of a single Activity execution attempt. + /// Note that the Temporal Server doesn't detect Worker process failures directly. + /// It relies on this timeout to detect that an Activity that didn't complete on time. + /// So this timeout should be as short as the longest possible execution of the Activity body. + /// Potentially long running Activities must specify `ActivityOptions::heartbeat_timeout` and heartbeat from the + /// activity periodically for timely failure detection. + StartToClose(Duration), + /// Applies both execution-attempt and overall-completion bounds. + Both { + /// Maximum time of a single Activity execution attempt. + start_to_close: Duration, + /// Total time that a workflow is willing to wait for Activity to complete. + schedule_to_close: Duration, + }, +} + +impl ActivityCloseTimeouts { + fn into_durations(self) -> (Option, Option) { + match self { + Self::ScheduleToClose(schedule_to_close) => (None, Some(schedule_to_close)), + Self::StartToClose(start_to_close) => (Some(start_to_close), None), + Self::Both { + start_to_close, + schedule_to_close, + } => (Some(start_to_close), Some(schedule_to_close)), + } + } +} + /// Options for scheduling a local activity #[derive(Default, Debug, Clone)] pub struct LocalActivityOptions { @@ -525,6 +580,50 @@ impl ContinueAsNewOptions { #[cfg(test)] mod tests { use super::*; + use temporalio_common::protos::coresdk::workflow_commands::workflow_command::Variant; + + #[test] + fn activity_options_with_start_to_close_timeout_wrapper_supports_builder_chaining() { + let opts = ActivityOptions::with_start_to_close_timeout(Duration::from_secs(5)) + .heartbeat_timeout(Duration::from_secs(2)) + .build(); + + assert_eq!( + opts.close_timeouts, + ActivityCloseTimeouts::StartToClose(Duration::from_secs(5)) + ); + assert_eq!(opts.heartbeat_timeout, Some(Duration::from_secs(2))); + } + + #[test] + fn activity_options_with_schedule_to_close_timeout_wrapper_supports_builder_chaining() { + let opts = ActivityOptions::with_schedule_to_close_timeout(Duration::from_secs(5)) + .heartbeat_timeout(Duration::from_secs(2)) + .build(); + + assert_eq!( + opts.close_timeouts, + ActivityCloseTimeouts::ScheduleToClose(Duration::from_secs(5)) + ); + assert_eq!(opts.heartbeat_timeout, Some(Duration::from_secs(2))); + } + + #[test] + fn activity_options_both_close_timeouts_map_to_command() { + let cmd = ActivityOptions::with_close_timeouts(ActivityCloseTimeouts::Both { + start_to_close: Duration::from_secs(3), + schedule_to_close: Duration::from_secs(8), + }) + .build() + .into_command("test".to_string(), vec![], 7); + let schedule_cmd = match cmd.variant.unwrap() { + Variant::ScheduleActivity(cmd) => cmd, + other => panic!("Expected ScheduleActivity, got {other:?}"), + }; + + assert_eq!(schedule_cmd.start_to_close_timeout.unwrap().seconds, 3); + assert_eq!(schedule_cmd.schedule_to_close_timeout.unwrap().seconds, 8); + } #[test] fn child_workflow_run_timeout_uses_run_timeout_field() {