Skip to content

Commit 08adc4a

Browse files
feat(sdk): add typed continue as new (#1209)
* feat(sdk): add typed continue as new * update readme and example * pr feedback
1 parent b544f95 commit 08adc4a

6 files changed

Lines changed: 324 additions & 46 deletions

File tree

crates/sdk-core/tests/integ_tests/workflow_tests/continue_as_new.rs

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use temporalio_client::WorkflowStartOptions;
44
use temporalio_common::{
55
protos::{
66
DEFAULT_WORKFLOW_TYPE, canned_histories,
7-
coresdk::{AsJsonPayloadExt, workflow_commands::ContinueAsNewWorkflowExecution},
7+
coresdk::workflow_commands::ContinueAsNewWorkflowExecution,
88
temporal::api::{
99
command::v1::command::Attributes,
1010
enums::v1::{CommandType, ContinueAsNewVersioningBehavior},
@@ -14,7 +14,7 @@ use temporalio_common::{
1414
worker::WorkerTaskTypes,
1515
};
1616
use temporalio_macros::{workflow, workflow_methods};
17-
use temporalio_sdk::{WorkflowContext, WorkflowResult, WorkflowTermination};
17+
use temporalio_sdk::{ContinueAsNewOptions, WorkflowContext, WorkflowResult, WorkflowTermination};
1818
use temporalio_sdk_core::{TunerHolder, test_help::MockPollCfg};
1919

2020
#[workflow]
@@ -27,15 +27,9 @@ impl ContinueAsNewWf {
2727
async fn run(ctx: &mut WorkflowContext<Self>, run_ct: u8) -> WorkflowResult<()> {
2828
ctx.timer(Duration::from_millis(500)).await;
2929
if run_ct < 5 {
30-
Err(WorkflowTermination::continue_as_new(
31-
ContinueAsNewWorkflowExecution {
32-
arguments: vec![(run_ct + 1).as_json_payload().unwrap()],
33-
..Default::default()
34-
},
35-
))
36-
} else {
37-
Ok(())
30+
ctx.continue_as_new(&(run_ct + 1), ContinueAsNewOptions::default())?;
3831
}
32+
Ok(())
3933
}
4034
}
4135

@@ -145,12 +139,8 @@ impl ContinueAsNewSuggestedWf {
145139
ctx.timer(Duration::from_millis(500)).await;
146140
// Second WFT: flag should be true (set on WFT started event 8)
147141
assert!(ctx.continue_as_new_suggested());
148-
Err(WorkflowTermination::continue_as_new(
149-
ContinueAsNewWorkflowExecution {
150-
arguments: vec![[1].into()],
151-
..Default::default()
152-
},
153-
))
142+
ctx.continue_as_new(&(), ContinueAsNewOptions::default())?;
143+
Ok(())
154144
}
155145
}
156146

crates/sdk/README.md

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -237,12 +237,8 @@ let result = started.result().await?;
237237
### Continue-As-New
238238

239239
```rust
240-
// To continue as new, return an error with WorkflowTermination::ContinueAsNew
241-
Err(WorkflowTermination::continue_as_new(ContinueAsNewWorkflowExecution {
242-
workflow_type: "MyWorkflow".to_string(),
243-
arguments: vec![new_input.into()],
244-
..Default::default()
245-
}))
240+
// To continue as new, use the workflow context helper and propagate the termination
241+
ctx.continue_as_new(&new_input, ContinueAsNewOptions::default())?;
246242
```
247243

248244
### Patching (Versioning)
Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
#![allow(unreachable_pub)]
22
use std::time::Duration;
3-
use temporalio_common::protos::coresdk::{
4-
AsJsonPayloadExt, workflow_commands::ContinueAsNewWorkflowExecution,
5-
};
63
use temporalio_macros::{workflow, workflow_methods};
7-
use temporalio_sdk::{WorkflowContext, WorkflowResult, WorkflowTermination};
4+
use temporalio_sdk::{ContinueAsNewOptions, WorkflowContext, WorkflowResult};
85

96
#[workflow]
107
#[derive(Default)]
@@ -18,18 +15,12 @@ impl ContinueAsNewWorkflow {
1815
ctx.timer(Duration::from_millis(100)).await;
1916

2017
if current_iteration < max_iterations {
21-
Err(WorkflowTermination::continue_as_new(
22-
ContinueAsNewWorkflowExecution {
23-
arguments: vec![
24-
(current_iteration + 1, max_iterations)
25-
.as_json_payload()
26-
.unwrap(),
27-
],
28-
..Default::default()
29-
},
30-
))
31-
} else {
32-
Ok(format!("Completed after {max_iterations} iterations"))
18+
ctx.continue_as_new(
19+
&(current_iteration + 1, max_iterations),
20+
ContinueAsNewOptions::default(),
21+
)?;
3322
}
23+
24+
Ok(format!("Completed after {max_iterations} iterations"))
3425
}
3526
}

crates/sdk/src/lib.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,10 @@ pub use temporalio_client::Namespace;
100100
pub use workflow_context::{
101101
ActivityExecutionError, ActivityOptions, BaseWorkflowContext, CancellableFuture,
102102
ChildWorkflowExecutionError, ChildWorkflowOptions, ChildWorkflowSignalError,
103-
ExternalWorkflowHandle, LocalActivityOptions, NexusOperationOptions, ParentWorkflowInfo,
104-
RootWorkflowInfo, Signal, SignalData, StartChildWorkflowExecutionFailedCause,
105-
StartedChildWorkflow, SyncWorkflowContext, TimerOptions, WorkflowContext, WorkflowContextView,
103+
ContinueAsNewOptions, ExternalWorkflowHandle, LocalActivityOptions, NexusOperationOptions,
104+
ParentWorkflowInfo, RootWorkflowInfo, Signal, SignalData,
105+
StartChildWorkflowExecutionFailedCause, StartedChildWorkflow, SyncWorkflowContext,
106+
TimerOptions, WorkflowContext, WorkflowContextView,
106107
};
107108

108109
use crate::{

crates/sdk/src/workflow_context.rs

Lines changed: 249 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
mod options;
22

33
pub use options::{
4-
ActivityOptions, ChildWorkflowOptions, LocalActivityOptions, NexusOperationOptions, Signal,
5-
SignalData, TimerOptions,
4+
ActivityOptions, ChildWorkflowOptions, ContinueAsNewOptions, LocalActivityOptions,
5+
NexusOperationOptions, Signal, SignalData, TimerOptions,
66
};
77
pub use temporalio_common::protos::coresdk::child_workflow::StartChildWorkflowExecutionFailedCause;
88

99
use crate::{
1010
CancelExternalWfResult, CancellableID, CancellableIDWithReason, CommandCreateRequest,
1111
CommandSubscribeChildWorkflowCompletion, NexusStartResult, RustWfCmd, SignalExternalWfResult,
12-
SupportsCancelReason, TimerResult, UnblockEvent, Unblockable,
12+
SupportsCancelReason, TimerResult, UnblockEvent, Unblockable, WorkflowTermination,
1313
workflow_context::options::IntoWorkflowCommand, workflow_executor::SdkWakeGuard,
1414
};
1515
use futures_util::{
@@ -726,6 +726,31 @@ impl<W> SyncWorkflowContext<W> {
726726
.fuse()
727727
}
728728

729+
/// Signal that this workflow should continue as a new workflow execution with the given input and
730+
/// options.
731+
///
732+
/// This always returns an `Err` which should be propigated.
733+
pub fn continue_as_new(
734+
&self,
735+
input: &<W::Run as WorkflowDefinition>::Input,
736+
opts: ContinueAsNewOptions,
737+
) -> Result<std::convert::Infallible, WorkflowTermination>
738+
where
739+
W: crate::workflows::WorkflowImplementation,
740+
{
741+
let pc = &self.base.inner.payload_converter;
742+
let ctx = SerializationContext {
743+
data: &SerializationContextData::Workflow,
744+
converter: pc,
745+
};
746+
let arguments = pc
747+
.to_payloads(&ctx, input)
748+
.map_err(WorkflowTermination::failed)?;
749+
let workflow_type = self.workflow_initial_info().workflow_type.clone();
750+
let proto = opts.into_proto(workflow_type, arguments);
751+
Err(WorkflowTermination::continue_as_new(proto))
752+
}
753+
729754
/// Request to create a timer
730755
pub fn timer<T: Into<TimerOptions>>(&self, opts: T) -> impl CancellableFuture<TimerResult> {
731756
self.base.timer(opts)
@@ -1115,6 +1140,21 @@ impl<W> WorkflowContext<W> {
11151140
result
11161141
}
11171142

1143+
/// Signal that this workflow should continue as a new workflow execution with the given input and
1144+
/// options.
1145+
///
1146+
/// This always returns an `Err` which should be propigated
1147+
pub fn continue_as_new(
1148+
&self,
1149+
input: &<W::Run as WorkflowDefinition>::Input,
1150+
opts: ContinueAsNewOptions,
1151+
) -> Result<std::convert::Infallible, WorkflowTermination>
1152+
where
1153+
W: crate::workflows::WorkflowImplementation,
1154+
{
1155+
self.sync.continue_as_new(input, opts)
1156+
}
1157+
11181158
/// Wait for some condition on workflow state to become true, yielding the workflow if not.
11191159
///
11201160
/// The condition closure receives an immutable reference to the workflow state,
@@ -2087,3 +2127,209 @@ impl StartedNexusOperation {
20872127
.cancel(CancellableID::NexusOp(self.unblock_dat.schedule_seq));
20882128
}
20892129
}
2130+
2131+
#[cfg(test)]
2132+
mod tests {
2133+
use super::*;
2134+
use std::collections::HashMap;
2135+
use temporalio_common::{
2136+
data_converters::{TemporalDeserializable, TemporalSerializable},
2137+
protos::{
2138+
coresdk::{AsJsonPayloadExt, common::VersioningIntent},
2139+
temporal::api::common::v1::{Payload, RetryPolicy},
2140+
},
2141+
};
2142+
use temporalio_macros::{workflow, workflow_methods};
2143+
2144+
#[workflow]
2145+
#[derive(Default)]
2146+
struct TestWorkflow;
2147+
2148+
#[workflow_methods]
2149+
impl TestWorkflow {
2150+
#[run]
2151+
async fn run(_ctx: &mut WorkflowContext<Self>, _input: u8) -> crate::WorkflowResult<()> {
2152+
unreachable!("test workflow run should not be polled")
2153+
}
2154+
}
2155+
2156+
fn test_context() -> WorkflowContext<TestWorkflow> {
2157+
let init = InitializeWorkflow {
2158+
workflow_type: TestWorkflow.name().to_string(),
2159+
..Default::default()
2160+
};
2161+
let (_, cancelled_rx) = watch::channel(None);
2162+
let (base, _cmd_rx) = BaseWorkflowContext::new(
2163+
"default".to_string(),
2164+
"orig-task-queue".to_string(),
2165+
"run-id".to_string(),
2166+
init,
2167+
cancelled_rx,
2168+
PayloadConverter::default(),
2169+
);
2170+
WorkflowContext::from_base(base, Rc::new(RefCell::new(TestWorkflow)))
2171+
}
2172+
2173+
#[test]
2174+
fn workflow_context_continue_as_new_serializes_input_and_defaults() {
2175+
let ctx = test_context();
2176+
2177+
let termination = ctx
2178+
.continue_as_new(&7, ContinueAsNewOptions::default())
2179+
.expect_err("continue_as_new should terminate the workflow");
2180+
assert!(
2181+
matches!(termination, WorkflowTermination::ContinueAsNew(_)),
2182+
"expected continue-as-new termination, got {termination:?}"
2183+
);
2184+
let WorkflowTermination::ContinueAsNew(cmd) = termination else {
2185+
unreachable!()
2186+
};
2187+
2188+
assert_eq!(
2189+
*cmd,
2190+
temporalio_common::protos::coresdk::workflow_commands::ContinueAsNewWorkflowExecution {
2191+
workflow_type: TestWorkflow.name().to_string(),
2192+
arguments: vec![7u8.as_json_payload().unwrap()],
2193+
versioning_intent: VersioningIntent::Unspecified as i32,
2194+
..Default::default()
2195+
}
2196+
);
2197+
}
2198+
2199+
#[test]
2200+
fn sync_workflow_context_continue_as_new_applies_options() {
2201+
let ctx = test_context();
2202+
let sync = ctx.sync_context();
2203+
let mut memo = HashMap::new();
2204+
memo.insert(
2205+
"memo-key".to_string(),
2206+
Payload::from(b"memo-value".as_slice()),
2207+
);
2208+
let mut headers = HashMap::new();
2209+
headers.insert(
2210+
"header-key".to_string(),
2211+
Payload::from(b"header-value".as_slice()),
2212+
);
2213+
let mut search_attributes = SearchAttributes::default();
2214+
search_attributes.indexed_fields.insert(
2215+
"CustomKeywordField".to_string(),
2216+
Payload::from(b"value".as_slice()),
2217+
);
2218+
2219+
let termination = sync
2220+
.continue_as_new(
2221+
&11,
2222+
ContinueAsNewOptions {
2223+
workflow_type: Some("next-workflow".to_string()),
2224+
task_queue: Some("next-task-queue".to_string()),
2225+
run_timeout: Some(Duration::from_secs(10)),
2226+
task_timeout: Some(Duration::from_secs(3)),
2227+
memo: Some(memo.clone()),
2228+
headers: Some(headers.clone()),
2229+
search_attributes: Some(search_attributes.clone()),
2230+
retry_policy: Some(RetryPolicy {
2231+
maximum_attempts: 5,
2232+
..Default::default()
2233+
}),
2234+
versioning_intent: Some(VersioningIntent::Compatible),
2235+
},
2236+
)
2237+
.expect_err("continue_as_new should terminate the workflow");
2238+
assert!(
2239+
matches!(termination, WorkflowTermination::ContinueAsNew(_)),
2240+
"expected continue-as-new termination, got {termination:?}"
2241+
);
2242+
let WorkflowTermination::ContinueAsNew(cmd) = termination else {
2243+
unreachable!()
2244+
};
2245+
2246+
assert_eq!(
2247+
*cmd,
2248+
temporalio_common::protos::coresdk::workflow_commands::ContinueAsNewWorkflowExecution {
2249+
workflow_type: "next-workflow".to_string(),
2250+
task_queue: "next-task-queue".to_string(),
2251+
arguments: vec![11u8.as_json_payload().unwrap()],
2252+
workflow_run_timeout: Some(Duration::from_secs(10).try_into().unwrap()),
2253+
workflow_task_timeout: Some(Duration::from_secs(3).try_into().unwrap()),
2254+
memo,
2255+
headers,
2256+
search_attributes: Some(search_attributes),
2257+
retry_policy: Some(RetryPolicy {
2258+
maximum_attempts: 5,
2259+
..Default::default()
2260+
}),
2261+
versioning_intent: VersioningIntent::Compatible as i32,
2262+
..Default::default()
2263+
}
2264+
);
2265+
}
2266+
2267+
#[test]
2268+
fn continue_as_new_reports_serialization_errors() {
2269+
#[derive(Debug)]
2270+
struct FailingInput;
2271+
2272+
impl TemporalSerializable for FailingInput {
2273+
fn to_payload(
2274+
&self,
2275+
_ctx: &temporalio_common::data_converters::SerializationContext<'_>,
2276+
) -> Result<Payload, temporalio_common::data_converters::PayloadConversionError>
2277+
{
2278+
Err(
2279+
temporalio_common::data_converters::PayloadConversionError::EncodingError(
2280+
std::io::Error::other("serialization failure").into(),
2281+
),
2282+
)
2283+
}
2284+
}
2285+
2286+
impl TemporalDeserializable for FailingInput {
2287+
fn from_payload(
2288+
_ctx: &temporalio_common::data_converters::SerializationContext<'_>,
2289+
_payload: Payload,
2290+
) -> Result<Self, temporalio_common::data_converters::PayloadConversionError>
2291+
{
2292+
unreachable!("test input is only serialized")
2293+
}
2294+
}
2295+
2296+
#[workflow]
2297+
#[derive(Default)]
2298+
struct FailingWorkflow;
2299+
2300+
#[workflow_methods]
2301+
impl FailingWorkflow {
2302+
#[run]
2303+
async fn run(
2304+
_ctx: &mut WorkflowContext<Self>,
2305+
_input: FailingInput,
2306+
) -> crate::WorkflowResult<()> {
2307+
unreachable!("test workflow run should not be polled")
2308+
}
2309+
}
2310+
2311+
let init = InitializeWorkflow {
2312+
workflow_type: "failing-workflow".to_string(),
2313+
..Default::default()
2314+
};
2315+
let (_, cancelled_rx) = watch::channel(None);
2316+
let (base, _cmd_rx) = BaseWorkflowContext::new(
2317+
"default".to_string(),
2318+
"orig-task-queue".to_string(),
2319+
"run-id".to_string(),
2320+
init,
2321+
cancelled_rx,
2322+
PayloadConverter::default(),
2323+
);
2324+
let ctx = WorkflowContext::from_base(base, Rc::new(RefCell::new(FailingWorkflow)));
2325+
2326+
let err = ctx
2327+
.continue_as_new(&FailingInput, ContinueAsNewOptions::default())
2328+
.expect_err("serialization errors should be surfaced");
2329+
2330+
let WorkflowTermination::Failed(err) = err else {
2331+
panic!("expected failed termination, got {err:?}");
2332+
};
2333+
assert_eq!(err.to_string(), "Encoding error: serialization failure");
2334+
}
2335+
}

0 commit comments

Comments
 (0)