Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions crates/sdk-core/tests/integ_tests/data_converter_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,7 @@ impl DataConverterTestWorkflow {
input,
ActivityOptions::start_to_close_timeout(Duration::from_secs(5)),
)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
.await?;

Ok(output)
}
Expand All @@ -218,8 +217,7 @@ impl DescribeDataConverterWorkflow {
input,
ActivityOptions::start_to_close_timeout(Duration::from_secs(5)),
)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
.await?;

Ok(output)
}
Expand Down
7 changes: 2 additions & 5 deletions crates/sdk-core/tests/integ_tests/worker_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ use temporalio_common::{
use temporalio_macros::{activities, workflow, workflow_methods};
use temporalio_sdk::{
ActivityOptions, LocalActivityOptions, WorkerOptions, WorkflowContext, WorkflowResult,
WorkflowTermination,
activities::{ActivityContext, ActivityError},
interceptors::WorkerInterceptor,
};
Expand Down Expand Up @@ -419,15 +418,13 @@ async fn activity_tasks_from_completion_reserve_slots() {
(),
ActivityOptions::start_to_close_timeout(Duration::from_secs(5)),
)
.await
.map_err(|e| WorkflowTermination::from(anyhow::Error::from(e)))?;
.await?;
ctx.start_activity(
FakeAct::act2,
(),
ActivityOptions::start_to_close_timeout(Duration::from_secs(5)),
)
.await
.map_err(|e| WorkflowTermination::from(anyhow::Error::from(e)))?;
.await?;
ctx.state(|wf| wf.complete_token.cancel());
Ok(())
}
Expand Down
18 changes: 6 additions & 12 deletions crates/sdk-core/tests/integ_tests/workflow_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,7 @@ impl SlowCompletesWf {
"hi!".to_string(),
ActivityOptions::start_to_close_timeout(Duration::from_secs(5)),
)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
.await?;
ctx.timer(Duration::from_secs(1)).await;
}
Ok(())
Expand Down Expand Up @@ -881,8 +880,7 @@ async fn nondeterminism_errors_fail_workflow_when_configured_to(
"hi".to_owned(),
ActivityOptions::start_to_close_timeout(Duration::from_secs(5)),
)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
.await?;
Ok(())
}
}
Expand Down Expand Up @@ -956,15 +954,13 @@ async fn history_out_of_order_on_restart() {
..Default::default()
},
)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
.await?;
ctx.start_activity(
StdActivities::echo,
"hi".to_string(),
ActivityOptions::start_to_close_timeout(Duration::from_secs(5)),
)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
.await?;
ctx.state(|wf| wf.hit_sleep.notify_one());
ctx.timer(Duration::from_secs(5)).await;
Ok(())
Expand All @@ -987,17 +983,15 @@ async fn history_out_of_order_on_restart() {
..Default::default()
},
)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
.await?;
// Timer is added after restarting workflow
ctx.timer(Duration::from_secs(1)).await;
ctx.start_activity(
StdActivities::echo,
"hi".to_string(),
ActivityOptions::start_to_close_timeout(Duration::from_secs(5)),
)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
.await?;
ctx.timer(Duration::from_secs(2)).await;
Ok(())
}
Expand Down
31 changes: 15 additions & 16 deletions crates/sdk-core/tests/integ_tests/workflow_tests/activities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ impl OneActivityWorkflow {
input,
ActivityOptions::start_to_close_timeout(Duration::from_secs(5)),
)
.await
.map_err(|e| anyhow!("{e}"))?;
.await?;
Ok(r)
}
}
Expand All @@ -92,8 +91,7 @@ impl MultiArgActivityWorkflow {
(input, " world".to_string()),
ActivityOptions::start_to_close_timeout(Duration::from_secs(5)),
)
.await
.map_err(|e| anyhow!("{e}"))?;
.await?;
Ok(r)
}
}
Expand Down Expand Up @@ -172,18 +170,19 @@ async fn activity_panics_are_retryable() {
impl ActivityPanicRetryWorkflow {
#[run]
async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<u32> {
ctx.start_activity(
PanicOnceActivities::panic_once,
(),
ActivityOptions::with_start_to_close_timeout(Duration::from_secs(5))
.retry_policy(RetryPolicy {
maximum_attempts: 2,
..Default::default()
})
.build(),
)
.await
.map_err(|e| anyhow!("{e}").into())
let result = ctx
.start_activity(
PanicOnceActivities::panic_once,
(),
ActivityOptions::with_start_to_close_timeout(Duration::from_secs(5))
.retry_policy(RetryPolicy {
maximum_attempts: 2,
..Default::default()
})
.build(),
)
.await?;
Ok(result)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ impl HappyParent {
)
.await
.expect("Child should start OK");
started.result().await.map_err(|e| anyhow!(e).into())
started.result().await?;
Ok(())
}
}

Expand Down Expand Up @@ -811,8 +812,7 @@ impl PassChildWorkflowSummaryToMetadata {
..Default::default()
},
)
.await
.map_err(|e| anyhow!(e))?;
.await?;
Ok(())
}
}
Expand Down Expand Up @@ -920,7 +920,7 @@ impl ParentWf {
_ => return Err(anyhow!("Expected start failure").into()),
}
}
let started = start_res.map_err(|e| anyhow!(e))?;
let started = start_res?;
match (expectation, started.result().await) {
(Expectation::Success, Ok(_)) => Ok(()),
(Expectation::Failure, Err(ChildWorkflowExecutionError::Failed(failure))) => {
Expand Down Expand Up @@ -1274,13 +1274,9 @@ impl UntypedHappyParent {
..Default::default()
},
)
.await
.map_err(|e| anyhow!(e))?;
started
.result()
.await
.map(|_| ())
.map_err(|e| anyhow!(e).into())
.await?;
started.result().await?;
Ok(())
}
}

Expand Down Expand Up @@ -1416,8 +1412,7 @@ impl ChildSignalSerializationFailParent {
..Default::default()
},
)
.await
.map_err(|e| anyhow!(e))?;
.await?;

let signal_result = started
.signal(UnserializableSignalChild::bad_signal, AlwaysFailsSerialize)
Expand Down Expand Up @@ -1484,8 +1479,7 @@ impl UnitChildParentWf {
..Default::default()
},
)
.await
.map_err(|e| anyhow!(e))?;
.await?;
started.result().await?;
Ok(())
}
Expand Down
15 changes: 5 additions & 10 deletions crates/sdk-core/tests/integ_tests/workflow_tests/determinism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ impl TimerWfNondeterministic {
(),
ActivityOptions::start_to_close_timeout(Duration::from_secs(5)),
)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
.await?;
}
_ => panic!("Ran too many times"),
}
Expand Down Expand Up @@ -290,12 +289,10 @@ impl ActivityIdOrTypeChangeWf {
..Default::default()
},
)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
.await?;
} else {
ctx.start_local_activity(StdActivities::no_op, (), Default::default())
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
.await?;
}
} else if id_change {
ctx.start_activity(
Expand All @@ -305,16 +302,14 @@ impl ActivityIdOrTypeChangeWf {
.activity_id("I'm bad and wrong!".to_string())
.build(),
)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
.await?;
} else {
ctx.start_activity(
StdActivities::no_op,
(),
ActivityOptions::start_to_close_timeout(Duration::from_secs(5)),
)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
.await?;
}
Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ use temporalio_macros::{activities, workflow, workflow_methods};
use temporalio_sdk::{
ActivityExecutionError, ActivityOptions, ApplicationFailure, CancellableFuture,
LocalActivityOptions, WorkflowContext, WorkflowContextView, WorkflowResult,
WorkflowTermination,
activities::{ActivityContext, ActivityError},
interceptors::{FailOnNondeterminismInterceptor, WorkerInterceptor},
};
Expand Down Expand Up @@ -81,8 +80,7 @@ impl OneLocalActivityWf {
"hi!".to_string(),
LocalActivityOptions::default(),
)
.await
.map_err(|e| WorkflowTermination::from(anyhow::Error::from(e)))?;
.await?;
assert!(initial_workflow_time < ctx.workflow_time().unwrap());
Ok(())
}
Expand Down Expand Up @@ -1128,8 +1126,7 @@ async fn long_local_activity_with_update(
Duration::from_secs(6),
LocalActivityOptions::default(),
)
.await
.map_err(|e| WorkflowTermination::from(anyhow::Error::from(e)))?;
.await?;
Ok(ctx.state(|wf| wf.update_counter))
}

Expand Down Expand Up @@ -1273,8 +1270,7 @@ impl LocalActivityWithSummaryWf {
..Default::default()
},
)
.await
.map_err(|e| WorkflowTermination::from(anyhow::Error::from(e)))?;
.await?;
Ok(())
}
}
Expand Down Expand Up @@ -1440,8 +1436,7 @@ async fn local_act_heartbeat(#[case] shutdown_middle: bool) {
"hi".to_string(),
LocalActivityOptions::default(),
)
.await
.map_err(|e| WorkflowTermination::from(anyhow::Error::from(e)))?;
.await?;
Ok(())
}
}
Expand Down Expand Up @@ -1662,8 +1657,7 @@ async fn local_act_null_result() {
#[run(name = DEFAULT_WORKFLOW_TYPE)]
async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<()> {
ctx.start_local_activity(StdActivities::default, (), LocalActivityOptions::default())
.await
.map_err(|e| WorkflowTermination::from(anyhow::Error::from(e)))?;
.await?;
Ok(())
}
}
Expand Down Expand Up @@ -1699,8 +1693,7 @@ async fn local_act_command_immediately_follows_la_marker() {
#[run(name = DEFAULT_WORKFLOW_TYPE)]
async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<()> {
ctx.start_local_activity(StdActivities::default, (), LocalActivityOptions::default())
.await
.map_err(|e| WorkflowTermination::from(anyhow::Error::from(e)))?;
.await?;
ctx.timer(Duration::from_secs(1)).await;
Ok(())
}
Expand Down Expand Up @@ -2319,8 +2312,7 @@ async fn resolved_las_not_recorded_if_wft_fails_many_times() {
..Default::default()
},
)
.await
.map_err(|e| WorkflowTermination::from(anyhow::Error::from(e)))?;
.await?;
panic!()
}
}
Expand Down Expand Up @@ -2870,11 +2862,9 @@ impl TwoLaWf {
#[run(name = DEFAULT_WORKFLOW_TYPE)]
async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<()> {
ctx.start_local_activity(StdActivities::default, (), LocalActivityOptions::default())
.await
.map_err(|e| WorkflowTermination::from(anyhow::Error::from(e)))?;
.await?;
ctx.start_local_activity(StdActivities::default, (), LocalActivityOptions::default())
.await
.map_err(|e| WorkflowTermination::from(anyhow::Error::from(e)))?;
.await?;
Ok(())
}
}
Expand Down Expand Up @@ -3007,12 +2997,10 @@ impl LaTimerLaWf {
#[run(name = DEFAULT_WORKFLOW_TYPE)]
async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<()> {
ctx.start_local_activity(StdActivities::default, (), LocalActivityOptions::default())
.await
.map_err(|e| WorkflowTermination::from(anyhow::Error::from(e)))?;
.await?;
ctx.timer(Duration::from_secs(5)).await;
ctx.start_local_activity(StdActivities::default, (), LocalActivityOptions::default())
.await
.map_err(|e| WorkflowTermination::from(anyhow::Error::from(e)))?;
.await?;
Ok(())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,7 @@ impl ResetRandomseedWf {
"hi!".to_string(),
LocalActivityOptions::default(),
)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
.await?;
}
ctx.wait_condition(|s| s.post_fail_received).await;
ctx.state(|wf| wf.notify.notify_one());
Expand Down
3 changes: 1 addition & 2 deletions crates/sdk/examples/activity_heartbeating/workflows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ impl HeartbeatingWorkflow {
.heartbeat_timeout(Duration::from_secs(5))
.build(),
)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
.await?;
Ok(result)
}
}
Loading
Loading