Skip to content
21 changes: 20 additions & 1 deletion crates/common/src/protos/canned_histories.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1324,7 +1324,8 @@ pub fn single_child_workflow_try_cancelled(child_wf_id: &str) -> TestHistoryBuil
/// 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
/// 5: EVENT_TYPE_MARKER_RECORDED (la result)
/// 7: EVENT_TYPE_MARKER_RECORDED (la result)
/// 8: EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED
/// 8: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
/// 9: EVENT_TYPE_WORKFLOW_TASK_STARTED
Comment on lines +1327 to +1328
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just updating comment here to match actual history

pub fn two_local_activities_one_wft(parallel: bool) -> TestHistoryBuilder {
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
Expand All @@ -1340,6 +1341,24 @@ pub fn two_local_activities_one_wft(parallel: bool) -> TestHistoryBuilder {
t
}

/// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
/// 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
/// 3: EVENT_TYPE_WORKFLOW_TASK_STARTED
/// 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
/// 5: EVENT_TYPE_MARKER_RECORDED (LA 2 result)
/// 7: EVENT_TYPE_MARKER_RECORDED (LA 1 result)
/// 8: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
/// 9: EVENT_TYPE_WORKFLOW_TASK_STARTED
pub fn parallel_las_job_order_hist() -> TestHistoryBuilder {
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
t.add_local_activity_result_marker(2, "2", b"hi2".into());
t.add_local_activity_result_marker(1, "1", b"hi1".into());
t.add_workflow_task_scheduled_and_started();
t
}

/// Useful for one-of needs to write a crafted history to a file. Writes it as serialized proto
/// binary to the provided path.
pub fn write_hist_to_binfile(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ fsm! {
// is replaying), and then immediately scheduled and transitions to either requesting that lang
// execute the activity, or waiting for the marker from history.
Executing --(Schedule, shared on_schedule) --> RequestSent;
Replaying --(Schedule, on_schedule) --> WaitingMarkerEvent;
ReplayingPreResolved --(Schedule, on_schedule) --> WaitingMarkerEventPreResolved;
Replaying --(Schedule, on_schedule) --> WaitingResolveFromMarkerLookAhead;

// Execution path =============================================================================
RequestSent --(HandleResult(ResolveDat), on_handle_result) --> MarkerCommandCreated;
Expand All @@ -66,32 +65,28 @@ fsm! {
--> MarkerCommandRecorded;

// Replay path ================================================================================
// LAs on the replay path always need to eventually see the marker
WaitingMarkerEvent --(MarkerRecorded(CompleteLocalActivityData), shared on_marker_recorded)
--> MarkerCommandRecorded;
WaitingResolveFromMarkerLookAhead --(HandleKnownResult(ResolveDat), on_handle_result) --> ResolvedFromMarkerLookAheadWaitingMarkerEvent;
// If we are told to cancel while waiting for the marker, we still need to wait for the marker.
WaitingMarkerEvent --(Cancel, on_cancel_requested) --> WaitingMarkerEvent;
WaitingResolveFromMarkerLookAhead --(Cancel, on_cancel_requested) --> WaitingResolveFromMarkerLookAhead;
ResolvedFromMarkerLookAheadWaitingMarkerEvent --(Cancel, on_cancel_requested) --> ResolvedFromMarkerLookAheadWaitingMarkerEvent;

// Because there could be non-heartbeat WFTs (ex: signals being received) between scheduling
// the LA and the marker being recorded, peekahead might not always resolve the LA *before*
// scheduling it. This transition accounts for that.
WaitingMarkerEvent --(HandleKnownResult(ResolveDat), on_handle_result) --> WaitingMarkerEvent;
WaitingMarkerEvent --(NoWaitCancel(ActivityCancellationType),
on_no_wait_cancel) --> WaitingMarkerEvent;
WaitingResolveFromMarkerLookAhead --(NoWaitCancel(ActivityCancellationType),
on_no_wait_cancel) --> WaitingResolveFromMarkerLookAhead;
ResolvedFromMarkerLookAheadWaitingMarkerEvent --(NoWaitCancel(ActivityCancellationType),
on_no_wait_cancel) --> ResolvedFromMarkerLookAheadWaitingMarkerEvent;
Comment on lines 73 to +79
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Two minor naming/comment issues: (1) apply_local_action_peeked_resolutions and its doc comment use "local action" instead of "local activity", inconsistent with the rest of the codebase (LocalActivityMachine, LocalActivityData, local_activity_data, etc.). (2) The comment at lines 73-75 about peekahead resolution timing was originally attached to the HandleKnownResult transition but after the refactor now incorrectly describes the NoWaitCancel transitions below it — it should be moved above line 68.

Extended reasoning...

Naming inconsistency: "local action" vs "local activity"

The new function apply_local_action_peeked_resolutions (workflow_machines.rs:1660) and its doc comment on line 1659 ("Applies local action preresolutions peeked from history...") use the term "local action" instead of "local activity". The entire codebase consistently uses "local activity" — LocalActivityMachine, LocalActivityData, local_activity_data, LocalActivityExecutionResult, etc. The same incorrect term also appears in the comment at line 814 referencing this function name. This should be apply_local_activity_peeked_resolutions with the comment updated to say "local activity".

Misplaced FSM comment

In local_activity_state_machine.rs, lines 73-75 contain a comment:

// Because there could be non-heartbeat WFTs (ex: signals being received) between scheduling
// the LA and the marker being recorded, peekahead might not always resolve the LA *before*
// scheduling it. This transition accounts for that.

This comment was originally placed above the HandleKnownResult(ResolveDat) transition in the old code, which it correctly described — that transition handles the case where peekahead resolves the LA after it has already been scheduled. After the refactor, HandleKnownResult was moved to line 68, but the comment was left at lines 73-75 where it now precedes the NoWaitCancel transitions on lines 76-79, which deal with cancellation semantics, not peekahead resolution timing.

Step-by-step proof

Looking at the FSM definition starting at line 65:

  • Line 68: WaitingResolveFromMarkerLookAhead --(HandleKnownResult(ResolveDat), on_handle_result) --> ResolvedFromMarkerLookAheadWaitingMarkerEvent; — this is the transition the comment describes
  • Lines 73-75: The peekahead comment
  • Lines 76-79: WaitingResolveFromMarkerLookAhead --(NoWaitCancel(...)) and ResolvedFromMarkerLookAheadWaitingMarkerEvent --(NoWaitCancel(...)) — these are cancellation transitions, unrelated to peekahead

The comment says "This transition accounts for that" but "that" (peekahead not resolving before scheduling) is accounted for by HandleKnownResult on line 68, not NoWaitCancel on lines 76-79.

Impact

Both issues are cosmetic with zero runtime impact. The naming inconsistency makes the function harder to find via grep/search for "local activity", and the misplaced comment could confuse future contributors reading the FSM definition.

Fix

  1. Rename apply_local_action_peeked_resolutions to apply_local_activity_peeked_resolutions and update the doc comment and all references.
  2. Move the comment block from lines 73-75 to just above line 68 (before the HandleKnownResult transition).


// LAs on the replay path always need to eventually see the marker
ResolvedFromMarkerLookAheadWaitingMarkerEvent --(MarkerRecorded(CompleteLocalActivityData), shared on_marker_recorded)
--> MarkerCommandRecorded;

// It is entirely possible to have started the LA while replaying, only to find that we have
// reached a new WFT and there still was no marker. In such cases we need to execute the LA.
// This can easily happen if upon first execution, the worker does WFT heartbeating but then
// dies for some reason.
Comment on lines 67 to 88
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 The WaitingResolveFromMarkerLookAhead state is missing a MarkerRecorded transition and is not handled in marker_should_get_special_handling(), causing a fatal error when LA markers arrive before lookahead has pre-resolved the machine. This happens during paginated replay with heartbeat WFTs: the 2-WFT extraction guarantee may yield WFT1 + heartbeat-WFT2 (no markers), and when WFT3 (containing the markers) arrives via page fetch, the LA machines are still in WaitingResolveFromMarkerLookAhead and hit the catch-all fatal!() branch. Fix by adding a MarkerRecorded transition from WaitingResolveFromMarkerLookAhead (resolving directly) and including it in marker_should_get_special_handling().

Extended reasoning...

Bug Analysis

The old code had a WaitingMarkerEvent state with a direct MarkerRecorded transition:

WaitingMarkerEvent --(MarkerRecorded(...)) --> MarkerCommandRecorded

and marker_should_get_special_handling() returned Ok(true) for WaitingMarkerEvent. The new code replaces this with WaitingResolveFromMarkerLookAhead, which has no MarkerRecorded transition and is not listed in marker_should_get_special_handling() (falling into _ => Err(fatal!(...))). The design assumes lookahead always pre-resolves the machine to ResolvedFromMarkerLookAheadWaitingMarkerEvent before the marker event is processed.

Why the assumption fails

The HistoryPaginator::extract_next_update() guarantees at least 2 WFT sequences when more pages exist (line 294: if update.wft_count < 2 { continue }). However, this guarantee is insufficient when heartbeat WFTs are present. Consider this history:

WFT1 (events 1-3): Schedule LA1, LA2
WFT2 (events 4-5): Heartbeat (LAs still running)
WFT3 (events 6-10): WFT2_completed, LA1 marker, LA2 marker, WFT3_scheduled, WFT3_started

extract_next_update() returns events 1-5 (WFT1 + WFT2 = 2 WFTs), satisfying the guarantee, but the LA markers are in WFT3 on a different page.

Step-by-step proof

  1. Initial update: extract_next_update() returns events 1-5 (WFT1 + heartbeat WFT2). wft_count == 2, so it stops fetching despite more pages existing.
  2. Process WFT1: apply_next_wft_from_history takes events 1-3. The lookahead (peek_next_wft_sequence) sees events 4-5 (heartbeat WFT2) — no LA markers found, so no preresolutions are stored.
  3. Activation to lang: Lang schedules LA1 and LA2.
  4. Completion processing: push_commands_and_iterateiterate_machineshandle_driven_results creates LA machines in WaitingResolveFromMarkerLookAhead. apply_local_action_peeked_resolutions() finds no preresolutions.
  5. Process WFT2 (heartbeat): apply_next_task_if_readyapply_next_wft_from_history takes events 4-5. Lookahead peeks — buffer is now empty (WFT3 not fetched yet). No preresolutions stored.
  6. Lang completes WFT2: ready_to_apply_next_wft() returns false (buffer empty, not last WFT). Completion is deferred, page fetch is triggered.
  7. Page 2 arrives: _fetched_page_completion_process_completion calls push_commands_and_iterate (for deferred WFT2 completion) then feed_history_from_new_page(update).
  8. feed_history_from_new_pagenew_history_from_serverapply_next_wft_from_history processes WFT3 events. The event loop encounters event 7 (LA marker).
  9. handle_command_eventmarker_should_get_special_handling() is called on the LA machine, which is still in WaitingResolveFromMarkerLookAhead. This state hits the _ => Err(fatal!(...)) catch-all → fatal error.

The lookahead at the end of apply_next_wft_from_history (which peeks at WFT4) cannot help because the marker is processed in the event loop before the lookahead runs.

Addressing the refutation

The refutation correctly states that extract_next_update guarantees ≥2 WFT sequences. However, when heartbeat WFTs are present, those 2 WFTs can be WFT1 + heartbeat-WFT2, which contain no LA markers. The markers in WFT3 require a separate page fetch, during which the machines remain in the unresolvable WaitingResolveFromMarkerLookAhead state. The 2-WFT guarantee prevents the scenario only when markers appear in the immediate next WFT after scheduling — it does not cover the heartbeat + pagination case.

Impact

This is a regression from the old code. In production, long-running workflows commonly use WFT heartbeating (especially with local activities), and large histories require pagination. When page boundaries fall between a heartbeat WFT and the WFT containing LA markers, the workflow will hit a fatal error during replay, preventing it from making progress.

Fix

Add a MarkerRecorded transition from WaitingResolveFromMarkerLookAhead that resolves the LA directly (like the old WaitingMarkerEvent did), and add WaitingResolveFromMarkerLookAhead to the match in marker_should_get_special_handling() returning Ok(true). This restores the fallback path that the old code had.

WaitingMarkerEvent --(StartedNonReplayWFT, shared on_started_non_replay_wft) --> RequestSent;

// If the activity is pre resolved we still expect to see marker recorded event at some point,
// even though we already resolved the activity.
WaitingMarkerEventPreResolved --(MarkerRecorded(CompleteLocalActivityData),
shared on_marker_recorded) --> MarkerCommandRecorded;
// Ignore cancellations when waiting for the marker after being pre-resolved
WaitingMarkerEventPreResolved --(Cancel) --> WaitingMarkerEventPreResolved;
WaitingMarkerEventPreResolved --(NoWaitCancel(ActivityCancellationType))
--> WaitingMarkerEventPreResolved;
WaitingResolveFromMarkerLookAhead --(StartedNonReplayWFT, shared on_started_non_replay_wft) --> RequestSent;

// Ignore cancellation in final state
MarkerCommandRecorded --(Cancel, on_cancel_requested) --> MarkerCommandRecorded;
Expand Down Expand Up @@ -151,22 +146,12 @@ impl From<CompleteLocalActivityData> for ResolveDat {
pub(super) fn new_local_activity(
mut attrs: ValidScheduleLA,
replaying_when_invoked: bool,
maybe_pre_resolved: Option<ResolveDat>,
wf_time: Option<SystemTime>,
internal_flags: InternalFlagsRef,
) -> Result<(LocalActivityMachine, Vec<MachineResponse>), WFMachinesError> {
let initial_state = if replaying_when_invoked {
if let Some(dat) = maybe_pre_resolved {
ReplayingPreResolved { dat }.into()
} else {
Replaying {}.into()
}
Replaying {}.into()
} else {
if maybe_pre_resolved.is_some() {
return Err(nondeterminism!(
"Local activity cannot be created as pre-resolved while not replaying"
));
}
Executing {}.into()
};

Expand Down Expand Up @@ -202,15 +187,12 @@ impl LocalActivityMachine {
/// command-event processing - instead simply applying the event to this machine and then
/// skipping over the rest. If this machine is in the `ResultNotified` state, that means
/// command handling should proceed as normal (ie: The command needs to be matched and removed).
/// The other valid states to make this check in are the `WaitingMarkerEvent[PreResolved]`
/// states, which will return true.
///
/// Attempting the check in any other state likely means a bug in the SDK.
pub(super) fn marker_should_get_special_handling(&self) -> Result<bool, WFMachinesError> {
match self.state() {
LocalActivityMachineState::ResultNotified(_) => Ok(false),
LocalActivityMachineState::WaitingMarkerEvent(_) => Ok(true),
LocalActivityMachineState::WaitingMarkerEventPreResolved(_) => Ok(true),
LocalActivityMachineState::ResolvedFromMarkerLookAheadWaitingMarkerEvent(_) => Ok(true),
_ => Err(fatal!(
"Attempted to check for LA marker handling in invalid state {}",
self.state()
Expand All @@ -223,7 +205,7 @@ impl LocalActivityMachine {
pub(super) fn will_accept_resolve_marker(&self) -> bool {
matches!(
self.state(),
LocalActivityMachineState::WaitingMarkerEvent(_)
LocalActivityMachineState::WaitingResolveFromMarkerLookAhead(_)
)
}

Expand All @@ -234,7 +216,7 @@ impl LocalActivityMachine {
// This only applies to the waiting-for-marker state. It can safely be ignored in the others
if !matches!(
self.state(),
LocalActivityMachineState::WaitingMarkerEvent(_)
LocalActivityMachineState::WaitingResolveFromMarkerLookAhead(_)
) {
return Ok(vec![]);
}
Expand Down Expand Up @@ -368,12 +350,6 @@ pub(super) enum LocalActivityCommand {
RequestActivityExecution(ValidScheduleLA),
#[display("Resolved")]
Resolved(ResolveDat),
/// The fake marker is used to avoid special casing marker recorded event handling.
/// If we didn't have the fake marker, there would be no "outgoing command" to match
/// against the event. This way there is, but the command never will be issued to
/// server because it is understood to be meaningless.
#[display("FakeMarker")]
FakeMarker,
/// Indicate we want to cancel an LA that is currently executing, or look up if we have
/// processed a marker with resolution data since the machine was constructed.
#[display("Cancel")]
Expand Down Expand Up @@ -448,31 +424,10 @@ impl MarkerCommandRecorded {
#[derive(Default, Clone)]
pub(super) struct Replaying {}
impl Replaying {
pub(super) fn on_schedule(self) -> LocalActivityMachineTransition<WaitingMarkerEvent> {
TransitionResult::ok(
[],
WaitingMarkerEvent {
already_resolved: false,
},
)
}
}

#[derive(Clone)]
pub(super) struct ReplayingPreResolved {
dat: ResolveDat,
}
impl ReplayingPreResolved {
pub(super) fn on_schedule(
self,
) -> LocalActivityMachineTransition<WaitingMarkerEventPreResolved> {
TransitionResult::ok(
[
LocalActivityCommand::FakeMarker,
LocalActivityCommand::Resolved(self.dat),
],
WaitingMarkerEventPreResolved {},
)
) -> LocalActivityMachineTransition<WaitingResolveFromMarkerLookAhead> {
TransitionResult::ok([], WaitingResolveFromMarkerLookAhead {})
}
}

Expand Down Expand Up @@ -559,35 +514,16 @@ impl ResultNotified {
}

#[derive(Default, Clone)]
pub(super) struct WaitingMarkerEvent {
already_resolved: bool,
}
pub(super) struct WaitingResolveFromMarkerLookAhead {}

impl WaitingMarkerEvent {
pub(super) fn on_marker_recorded(
self,
shared: &mut SharedState,
dat: CompleteLocalActivityData,
) -> LocalActivityMachineTransition<MarkerCommandRecorded> {
verify_marker_dat!(
shared,
&dat,
TransitionResult::commands(if self.already_resolved {
vec![]
} else {
vec![LocalActivityCommand::Resolved(dat.into())]
})
)
}
impl WaitingResolveFromMarkerLookAhead {
fn on_handle_result(
self,
dat: ResolveDat,
) -> LocalActivityMachineTransition<WaitingMarkerEvent> {
) -> LocalActivityMachineTransition<ResolvedFromMarkerLookAheadWaitingMarkerEvent> {
TransitionResult::ok(
[LocalActivityCommand::Resolved(dat)],
WaitingMarkerEvent {
already_resolved: true,
},
ResolvedFromMarkerLookAheadWaitingMarkerEvent {},
)
}
pub(super) fn on_started_non_replay_wft(
Expand All @@ -601,7 +537,9 @@ impl WaitingMarkerEvent {
)])
}

fn on_cancel_requested(self) -> LocalActivityMachineTransition<WaitingMarkerEvent> {
fn on_cancel_requested(
self,
) -> LocalActivityMachineTransition<WaitingResolveFromMarkerLookAhead> {
// We still "request a cancel" even though we know the local activity should not be running
// because the data might be in the pre-resolved list.
TransitionResult::ok([LocalActivityCommand::RequestCancel], self)
Expand All @@ -610,23 +548,41 @@ impl WaitingMarkerEvent {
fn on_no_wait_cancel(
self,
_: ActivityCancellationType,
) -> LocalActivityMachineTransition<WaitingMarkerEvent> {
) -> LocalActivityMachineTransition<WaitingResolveFromMarkerLookAhead> {
// Markers are always recorded when cancelling, so this is the same as a normal cancel on
// the replay path
self.on_cancel_requested()
}
}

#[derive(Default, Clone)]
pub(super) struct WaitingMarkerEventPreResolved {}
impl WaitingMarkerEventPreResolved {
pub(super) struct ResolvedFromMarkerLookAheadWaitingMarkerEvent {}

impl ResolvedFromMarkerLookAheadWaitingMarkerEvent {
pub(super) fn on_marker_recorded(
self,
shared: &mut SharedState,
dat: CompleteLocalActivityData,
) -> LocalActivityMachineTransition<MarkerCommandRecorded> {
verify_marker_dat!(shared, &dat, TransitionResult::default())
}

fn on_cancel_requested(
self,
) -> LocalActivityMachineTransition<ResolvedFromMarkerLookAheadWaitingMarkerEvent> {
// We still "request a cancel" even though we know the local activity should not be running
// because the data might be in the pre-resolved list.
TransitionResult::ok([LocalActivityCommand::RequestCancel], self)
}

fn on_no_wait_cancel(
self,
_: ActivityCancellationType,
) -> LocalActivityMachineTransition<ResolvedFromMarkerLookAheadWaitingMarkerEvent> {
// Markers are always recorded when cancelling, so this is the same as a normal cancel on
// the replay path
self.on_cancel_requested()
}
}

impl WFMachinesAdapter for LocalActivityMachine {
Expand Down Expand Up @@ -780,12 +736,6 @@ impl WFMachinesAdapter for LocalActivityMachine {
}
Ok(responses)
}
LocalActivityCommand::FakeMarker => {
// See docs for `FakeMarker` for more
Ok(vec![MachineResponse::IssueFakeLocalActivityMarker(
self.shared_state.attrs.seq,
)])
}
LocalActivityCommand::RequestCancel => {
Ok(vec![MachineResponse::RequestCancelLocalActivity(
self.shared_state.attrs.seq,
Expand Down
Loading
Loading