Skip to content

Commit 1bd57bd

Browse files
committed
lint and format
1 parent 004d6b2 commit 1bd57bd

6 files changed

Lines changed: 1868 additions & 218 deletions

File tree

arch_docs/workflow_task_chunking.md

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,20 @@ One source of complexity in Core is the chunking of history into "logical" Workf
44

55
Workflow tasks (WFTs) always take the following form in event history:
66

7-
* \[Preceding Events\] (optional)
8-
* WFT Scheduled
9-
* WFT Started
10-
* WFT Completed
11-
* \[Commands\] (optional)
7+
- \[Preceding Events\] (optional)
8+
- WFT Scheduled
9+
- WFT Started
10+
- WFT Completed
11+
- \[Commands\] (optional)
1212

1313
In the typical case, the "logical" WFT consists of all the commands from the last workflow task,
1414
any events generated in the interrim, and the scheduled/started preamble. So:
1515

16-
* WFT Completed
17-
* \[Commands\] (optional)
18-
* \[Events\] (optional)
19-
* WFT Scheduled
20-
* WFT Started
16+
- WFT Completed
17+
- \[Commands\] (optional)
18+
- \[Events\] (optional)
19+
- WFT Scheduled
20+
- WFT Started
2121

2222
Commands and events are both "optional" in the sense that:
2323

@@ -32,8 +32,22 @@ There may be no events for more nuanced reasons:
3232
rather as a "protocol message" attached to the task.
3333
3. Server can forcibly generate a new WFT with some obscure APIs
3434

35-
Core does not consider such empty WFT sequences as worthy of waking lang (on replay - as a new
36-
task, they always will), since nothing meaningful has happened. Thus, they are grouped together
35+
## Handling of empty WFTs (March 2026)
36+
37+
Until now, Core would try to avoid waking lang when replaying empty WFT sequences (i.e. a WFT
38+
Completed immediately followed by a WFT Scheduled and WFT Started, with no commands and no events
39+
in-between), since they would presumably be no-ops and therefore waste execution resources. Instead,
40+
the second WFT (and potentially subsequent WFTs) would be collapsed into the first, resulting in a
41+
single WFT that has the inbound events and start time of the first WFT, but the resulting commands
42+
of the last WFT in the collapsed sequence.
43+
44+
It was found that there are some particular edge cases where WFT collapsing might result in
45+
incorrect replay behavior. One particular example of this is the case where an update request would
46+
be sent after an empty WFT. This led to the development of some heuristics to avoid incorrect
47+
chunking in presence of known problematic patterns. Unfortunately, it has been found that these
48+
heuristics may
49+
50+
Thus, they are grouped together task, they always will), since nothing meaningful has happened. Thus, they are grouped together
3751
as part of a "logical" WFT with the last WFT that had any real work in it.
3852

3953
## Possible issues as of this writing (5/25)
@@ -44,8 +58,8 @@ NDE.
4458

4559
### Possible solutions
4660

47-
* Core can attach a flag on WFT completes in order to be explicit that that WFT may be skipped on
61+
- Core can attach a flag on WFT completes in order to be explicit that that WFT may be skipped on
4862
replay. IE: During WFT heartbeating for LAs.
49-
* We could legislate that server should never send empty WFTs. Seemingly the only case of this
63+
- We could legislate that server should never send empty WFTs. Seemingly the only case of this
5064
is
5165
the [obscure api](https://github.com/temporalio/temporal/blob/d189737aa2ed1b07c221abb9fbdd28ecf68f0492/proto/internal/temporal/server/api/adminservice/v1/service.proto#L151)

crates/sdk-core/src/core_tests/workflow_tasks.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2463,7 +2463,7 @@ async fn core_internal_flags() {
24632463
.iter()
24642464
.copied()
24652465
.collect::<HashSet<_>>(),
2466-
CoreInternalFlags::all_except_too_high()
2466+
CoreInternalFlags::all_except_unknown()
24672467
.map(|f| f as u32)
24682468
.collect()
24692469
);

crates/sdk-core/src/internal_flags.rs

Lines changed: 61 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@ pub enum CoreInternalFlags {
2525
/// In this flag additional checks were added to a number of state machines to ensure that
2626
/// the ID and type of activities, local activities, and child workflows match during replay.
2727
IdAndTypeDeterminismChecks = 1,
28+
2829
/// Introduced automatically upserting search attributes for each patched call, and
2930
/// nondeterminism checks for upserts.
3031
UpsertSearchAttributeOnPatch = 2,
32+
3133
/// Prior to this flag, we truncated commands received from lang at the
3234
/// first terminal (i.e. workflow-terminating) command. With this flag, we
3335
/// reorder commands such that all non-terminal commands come first,
@@ -37,8 +39,57 @@ pub enum CoreInternalFlags {
3739
/// if in the sequence delivered by lang they came after a terminal command.
3840
/// See <https://github.com/temporalio/features/issues/481>.
3941
MoveTerminalCommands = 3,
42+
43+
/// Indicates that this workflow uses the improved WFT heartbeat heuristic that
44+
/// is more conservative when deciding whether to collapse consecutive empty WFTs.
45+
/// The improved heuristic avoids incorrectly collapsing WFTs that carry updates,
46+
/// and requires more look-ahead before committing to a chunking decision when
47+
/// history is paginated.
48+
///
49+
/// Only set on workflows started after this flag was introduced (first-WFT-only).
50+
/// Existing workflows continue to use the legacy heuristic.
51+
ImprovedHeartbeatHeuristic = 4,
52+
4053
/// We received a value higher than this code can understand.
41-
TooHigh = u32::MAX,
54+
UnknownFlag = u32::MAX,
55+
}
56+
57+
impl CoreInternalFlags {
58+
fn from_u32(v: u32) -> Self {
59+
match v {
60+
1 => Self::IdAndTypeDeterminismChecks,
61+
2 => Self::UpsertSearchAttributeOnPatch,
62+
3 => Self::MoveTerminalCommands,
63+
4 => Self::ImprovedHeartbeatHeuristic,
64+
_ => Self::UnknownFlag,
65+
}
66+
}
67+
68+
/// Returns all cumulative flags that should be enabled by default on every WFT completion.
69+
pub(crate) fn all_cumulative_default_enabled() -> impl Iterator<Item = CoreInternalFlags> {
70+
[
71+
CoreInternalFlags::IdAndTypeDeterminismChecks,
72+
CoreInternalFlags::UpsertSearchAttributeOnPatch,
73+
CoreInternalFlags::MoveTerminalCommands,
74+
]
75+
.iter()
76+
.copied()
77+
}
78+
79+
/// Returns cumulative flags that should only be enabled on the first WFT of new workflows.
80+
/// These are not written on subsequent WFTs to avoid changing behavior of existing workflows.
81+
pub(crate) fn all_first_wft_only_default_enabled() -> impl Iterator<Item = CoreInternalFlags> {
82+
[CoreInternalFlags::ImprovedHeartbeatHeuristic]
83+
.iter()
84+
.copied()
85+
}
86+
87+
/// Returns all known flag variants (excluding the sentinel).
88+
#[cfg(test)]
89+
pub(crate) fn all_except_unknown() -> impl Iterator<Item = CoreInternalFlags> {
90+
enum_iterator::all::<CoreInternalFlags>()
91+
.filter(|f| !matches!(f, CoreInternalFlags::UnknownFlag))
92+
}
4293
}
4394

4495
#[allow(clippy::large_enum_variant)]
@@ -137,15 +188,19 @@ impl InternalFlags {
137188
}
138189
}
139190

140-
/// Writes all known core flags to the set which should be recorded in the current WFT if not
141-
/// already known. Must only be called if not replaying.
142-
pub(crate) fn write_all_known(&mut self) {
191+
/// Writes all core flags that should be enabled by default to the set which should be recorded
192+
/// in the current WFT if not already known. Must only be called if not replaying.
193+
pub(crate) fn write_all_cumulative_default_enabled(&mut self, is_first_wft: bool) {
143194
if let Self::Enabled {
144195
core_since_last_complete,
145196
..
146197
} = self
147198
{
148-
core_since_last_complete.extend(CoreInternalFlags::all_except_too_high());
199+
if is_first_wft {
200+
core_since_last_complete
201+
.extend(CoreInternalFlags::all_first_wft_only_default_enabled());
202+
}
203+
core_since_last_complete.extend(CoreInternalFlags::all_cumulative_default_enabled());
149204
}
150205
}
151206

@@ -214,22 +269,6 @@ impl InternalFlags {
214269
}
215270
}
216271

217-
impl CoreInternalFlags {
218-
fn from_u32(v: u32) -> Self {
219-
match v {
220-
1 => Self::IdAndTypeDeterminismChecks,
221-
2 => Self::UpsertSearchAttributeOnPatch,
222-
3 => Self::MoveTerminalCommands,
223-
_ => Self::TooHigh,
224-
}
225-
}
226-
227-
pub(crate) fn all_except_too_high() -> impl Iterator<Item = CoreInternalFlags> {
228-
enum_iterator::all::<CoreInternalFlags>()
229-
.filter(|f| !matches!(f, CoreInternalFlags::TooHigh))
230-
}
231-
}
232-
233272
#[cfg(test)]
234273
mod tests {
235274
use super::*;
@@ -266,7 +305,7 @@ mod tests {
266305

267306
#[test]
268307
fn all_have_u32_from_impl() {
269-
let all_known = CoreInternalFlags::all_except_too_high();
308+
let all_known = CoreInternalFlags::all_except_unknown();
270309
for flag in all_known {
271310
let as_u32 = flag as u32;
272311
assert_eq!(CoreInternalFlags::from_u32(as_u32), flag);

crates/sdk-core/src/replay/history_builder.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::replay::HistoryInfo;
1+
use crate::{internal_flags::CoreInternalFlags, replay::HistoryInfo};
22
use anyhow::bail;
33
use prost_types::Timestamp;
44
use std::{
@@ -48,6 +48,11 @@ pub struct TestHistoryBuilder {
4848
final_workflow_task_started_event_id: i64,
4949
previous_task_completed_id: i64,
5050
original_run_id: String,
51+
/// When true, the builder auto-sets `ImprovedHeartbeatHeuristic` on the first WFTCompleted.
52+
use_improved_heartbeat_heuristic: bool,
53+
/// Tracks whether we've already emitted the first WFTCompleted
54+
/// (to know when to set the cumulative `ImprovedHeartbeatHeuristic` flag).
55+
has_seen_wft_completed: bool,
5156
}
5257

5358
impl TestHistoryBuilder {
@@ -69,6 +74,8 @@ impl TestHistoryBuilder {
6974
original_run_id: extract_original_run_id_from_events(&events)
7075
.expect("Run id must be discoverable")
7176
.to_string(),
77+
use_improved_heartbeat_heuristic: false,
78+
has_seen_wft_completed: false,
7279
events,
7380
}
7481
}
@@ -89,6 +96,17 @@ impl TestHistoryBuilder {
8996
self.add(attribs)
9097
}
9198

99+
/// Enable the improved heartbeat heuristic flag for this builder. When enabled,
100+
/// the first WFTCompleted event gets the cumulative `ImprovedHeartbeatHeuristic` flag.
101+
pub fn set_use_improved_heartbeat_heuristic(&mut self) {
102+
if self.has_seen_wft_completed {
103+
panic!(
104+
"Improved heartbeat heuristic can only be enabled before the first WFTCompleted"
105+
);
106+
}
107+
self.use_improved_heartbeat_heuristic = true;
108+
}
109+
92110
/// Adds the following events:
93111
/// ```text
94112
/// EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
@@ -127,6 +145,13 @@ impl TestHistoryBuilder {
127145
..Default::default()
128146
});
129147
self.previous_task_completed_id = id;
148+
if self.use_improved_heartbeat_heuristic && !self.has_seen_wft_completed {
149+
self.set_flags_on_wft_completed(
150+
id,
151+
&[CoreInternalFlags::ImprovedHeartbeatHeuristic as u32],
152+
);
153+
}
154+
self.has_seen_wft_completed = true;
130155
}
131156

132157
/// Add a workflow task timed out event.
@@ -608,6 +633,17 @@ impl TestHistoryBuilder {
608633
Self::set_flags(self.events.iter_mut().rev(), core, lang)
609634
}
610635

636+
/// Sets core flags on the WFTCompleted event with the given event ID.
637+
pub fn set_flags_on_wft_completed(&mut self, event_id: i64, core: &[u32]) {
638+
if let Some(event) = self.events.iter_mut().find(|e| e.event_id == event_id)
639+
&& let Some(Attributes::WorkflowTaskCompletedEventAttributes(ref mut a)) =
640+
event.attributes
641+
{
642+
let sdk_dat = a.sdk_metadata.get_or_insert_with(Default::default);
643+
sdk_dat.core_used_flags.extend_from_slice(core);
644+
}
645+
}
646+
611647
/// Get the event ID of the most recently added event
612648
pub fn current_event_id(&self) -> i64 {
613649
self.current_event_id

0 commit comments

Comments
 (0)