Skip to content

Commit fc0daae

Browse files
committed
Implements fix & changes WFT boundaries
* One UT fixed to show, others fail
1 parent 20ecd89 commit fc0daae

5 files changed

Lines changed: 85 additions & 36 deletions

File tree

core/src/worker/workflow/history_update.rs

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -695,7 +695,8 @@ fn find_end_index_of_next_wft_seq(
695695
return NextWFTSeqEndIndex::Incomplete(0);
696696
}
697697
let mut last_index = 0;
698-
let mut saw_any_command_event = false;
698+
let mut saw_command_or_started = false;
699+
let mut saw_command = false;
699700
let mut wft_started_event_id_to_index = vec![];
700701
for (ix, e) in events.iter().enumerate() {
701702
last_index = ix;
@@ -706,8 +707,11 @@ fn find_end_index_of_next_wft_seq(
706707
continue;
707708
}
708709

709-
if e.is_command_event() || e.event_type() == EventType::WorkflowExecutionStarted {
710-
saw_any_command_event = true;
710+
if e.is_command_event() {
711+
saw_command = true;
712+
if e.event_type() == EventType::WorkflowExecutionStarted {
713+
saw_command_or_started = true;
714+
}
711715
}
712716
if e.is_final_wf_execution_event() {
713717
return NextWFTSeqEndIndex::Complete(last_index);
@@ -728,13 +732,14 @@ fn find_end_index_of_next_wft_seq(
728732
| EventType::WorkflowExecutionCanceled
729733
) {
730734
continue;
731-
}
732-
// If we've never seen an interesting event and the next two events are a completion
733-
// followed immediately again by scheduled, then this is a WFT heartbeat and also
734-
// doesn't conclude the sequence.
735-
else if next_event_type == EventType::WorkflowTaskCompleted {
735+
} else if next_event_type == EventType::WorkflowTaskCompleted {
736736
if let Some(next_next_event) = events.get(ix + 2) {
737-
if next_next_event.event_type() == EventType::WorkflowTaskScheduled {
737+
if !saw_command
738+
&& next_next_event.event_type() == EventType::WorkflowTaskScheduled
739+
{
740+
// If we've never seen an interesting event and the next two events are
741+
// a completion followed immediately again by scheduled, then this is a
742+
// WFT heartbeat and also doesn't conclude the sequence.
738743
continue;
739744
} else {
740745
// If we see an update accepted command after WFT completed, we want to
@@ -766,18 +771,18 @@ fn find_end_index_of_next_wft_seq(
766771
}
767772
return NextWFTSeqEndIndex::Complete(ix);
768773
}
769-
} else if !has_last_wft && !saw_any_command_event {
774+
} else if !has_last_wft && !saw_command_or_started {
770775
// Don't have enough events to look ahead of the WorkflowTaskCompleted. Need
771776
// to fetch more.
772777
continue;
773778
}
774779
}
775-
} else if !has_last_wft && !saw_any_command_event {
780+
} else if !has_last_wft && !saw_command_or_started {
776781
// Don't have enough events to look ahead of the WorkflowTaskStarted. Need to fetch
777782
// more.
778783
continue;
779784
}
780-
if saw_any_command_event {
785+
if saw_command_or_started {
781786
return NextWFTSeqEndIndex::Complete(ix);
782787
}
783788
}
@@ -921,7 +926,9 @@ mod tests {
921926
let seq = next_check_peek(&mut update, 0);
922927
assert_eq!(seq.len(), 6);
923928
let seq = next_check_peek(&mut update, 6);
924-
assert_eq!(seq.len(), 13);
929+
assert_eq!(seq.len(), 4);
930+
let seq = next_check_peek(&mut update, 10);
931+
assert_eq!(seq.len(), 9);
925932
let seq = next_check_peek(&mut update, 19);
926933
assert_eq!(seq.len(), 4);
927934
let seq = next_check_peek(&mut update, 23);

sdk-core-protos/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -628,8 +628,8 @@ pub mod coresdk {
628628
workflow_activation_job::Variant::ResolveRequestCancelExternalWorkflow(_) => {
629629
write!(f, "ResolveRequestCancelExternalWorkflow")
630630
}
631-
workflow_activation_job::Variant::DoUpdate(_) => {
632-
write!(f, "DoUpdate")
631+
workflow_activation_job::Variant::DoUpdate(u) => {
632+
write!(f, "DoUpdate({})", u.id)
633633
}
634634
workflow_activation_job::Variant::ResolveNexusOperationStart(_) => {
635635
write!(f, "ResolveNexusOperationStart")

sdk/src/workflow_context.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,11 @@ impl WfContext {
151151
self.shared.read().random_seed
152152
}
153153

154+
/// Returns true if the current workflow task is happening under replay
155+
pub fn is_replaying(&self) -> bool {
156+
self.shared.read().is_replaying
157+
}
158+
154159
/// Return various information that the workflow was initialized with. Will eventually become
155160
/// a proper non-proto workflow info struct.
156161
pub fn workflow_initial_info(&self) -> &InitializeWorkflow {

tests/integ_tests/update_tests.rs

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1154,46 +1154,48 @@ async fn update_lost_on_activity_mismatch() {
11541154
let mut worker = starter.worker().await;
11551155
let client = starter.get_client().await;
11561156

1157-
static CAN_RUN: AtomicBool = AtomicBool::new(true);
1158-
static DID_FAIL: AtomicBool = AtomicBool::new(false);
11591157
worker.register_wf(wf_name.to_owned(), move |ctx: WfContext| async move {
1158+
let can_run = Arc::new(AtomicUsize::new(1));
1159+
let cr = can_run.clone();
11601160
ctx.update_handler(
11611161
"update",
11621162
|_: &_, _: ()| Ok(()),
1163-
move |_: UpdateContext, _: ()| async move {
1164-
dbg!("Running update!!");
1165-
CAN_RUN.store(true, Ordering::Relaxed);
1166-
Ok(())
1163+
move |_: UpdateContext, _: ()| {
1164+
let cr = cr.clone();
1165+
async move {
1166+
cr.fetch_add(1, Ordering::Relaxed);
1167+
Ok(())
1168+
}
11671169
},
11681170
);
1169-
for i in 1..=3 {
1170-
ctx.wait_condition(|| CAN_RUN.load(Ordering::Relaxed)).await;
1171-
if i == 2 {
1172-
ctx.force_task_fail(anyhow!("Fail on purpose"));
1173-
DID_FAIL.store(true, Ordering::Relaxed);
1174-
}
1171+
for _ in 1..=3 {
1172+
let cr = can_run.clone();
1173+
ctx.wait_condition(|| cr.load(Ordering::Relaxed) > 0).await;
11751174
ctx.activity(ActivityOptions {
11761175
activity_type: "echo".to_string(),
11771176
input: "hi!".as_json_payload().expect("serializes fine"),
11781177
start_to_close_timeout: Some(Duration::from_secs(2)),
11791178
..Default::default()
11801179
})
11811180
.await;
1182-
CAN_RUN.store(false, Ordering::Release);
1181+
can_run.fetch_sub(1, Ordering::Release);
11831182
}
11841183
Ok(().into())
11851184
});
11861185
worker.register_activity("echo", |_ctx: ActContext, echo_me: String| async move {
11871186
Ok(echo_me)
11881187
});
11891188

1189+
let core_worker = worker.core_worker.clone();
11901190
let handle = starter.start_with_worker(wf_name, &mut worker).await;
11911191

11921192
let wf_id = starter.get_task_queue().to_string();
11931193
let update = async {
1194+
// Need time to get to condition
1195+
tokio::time::sleep(Duration::from_millis(200)).await;
1196+
// Evict wf
1197+
core_worker.request_workflow_eviction(handle.info().run_id.as_ref().unwrap());
11941198
for _ in 1..=2 {
1195-
// TODO: Without this, hangs, verify it doesn't after fix
1196-
tokio::time::sleep(Duration::from_millis(500)).await;
11971199
let res = client
11981200
.update_workflow_execution(
11991201
wf_id.clone(),
@@ -1213,9 +1215,6 @@ async fn update_lost_on_activity_mismatch() {
12131215
worker.run_until_done().await.unwrap();
12141216
};
12151217
join!(update, runner);
1216-
// This triggers the NDE before bugfix
1217-
CAN_RUN.store(true, Ordering::Relaxed);
1218-
dbg!("Replaying");
12191218
handle
12201219
.fetch_history_and_replay(worker.inner_mut())
12211220
.await

tests/integ_tests/workflow_tests/determinism.rs

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
use std::{
2-
sync::atomic::{AtomicUsize, Ordering},
2+
sync::atomic::{AtomicBool, AtomicUsize, Ordering},
33
time::Duration,
44
};
5-
use temporal_sdk::{ActivityOptions, WfContext, WorkflowResult};
6-
use temporal_sdk_core_test_utils::CoreWfStarter;
5+
use temporal_sdk::{ActContext, ActivityOptions, WfContext, WorkflowResult};
6+
use temporal_sdk_core_protos::coresdk::AsJsonPayloadExt;
7+
use temporal_sdk_core_test_utils::{CoreWfStarter, WorkflowHandleExt};
78

89
static RUN_CT: AtomicUsize = AtomicUsize::new(1);
910

@@ -45,3 +46,40 @@ async fn test_determinism_error_then_recovers() {
4546
// 4 because we still add on the 3rd and final attempt
4647
assert_eq!(RUN_CT.load(Ordering::Relaxed), 4);
4748
}
49+
50+
#[tokio::test]
51+
async fn task_fail_causes_replay_unset_too_soon() {
52+
let wf_name = "task_fail_causes_replay_unset_too_soon";
53+
let mut starter = CoreWfStarter::new(wf_name);
54+
let mut worker = starter.worker().await;
55+
56+
static DID_FAIL: AtomicBool = AtomicBool::new(false);
57+
worker.register_wf(wf_name.to_owned(), move |ctx: WfContext| async move {
58+
if DID_FAIL.load(Ordering::Relaxed) {
59+
assert!(ctx.is_replaying());
60+
}
61+
ctx.activity(ActivityOptions {
62+
activity_type: "echo".to_string(),
63+
input: "hi!".as_json_payload().expect("serializes fine"),
64+
start_to_close_timeout: Some(Duration::from_secs(2)),
65+
..Default::default()
66+
})
67+
.await;
68+
if !DID_FAIL.load(Ordering::Relaxed) {
69+
DID_FAIL.store(true, Ordering::Relaxed);
70+
panic!("Die on purpose");
71+
}
72+
Ok(().into())
73+
});
74+
worker.register_activity("echo", |_ctx: ActContext, echo_me: String| async move {
75+
Ok(echo_me)
76+
});
77+
78+
let handle = starter.start_with_worker(wf_name, &mut worker).await;
79+
80+
worker.run_until_done().await.unwrap();
81+
handle
82+
.fetch_history_and_replay(worker.inner_mut())
83+
.await
84+
.unwrap();
85+
}

0 commit comments

Comments
 (0)