From 61d4cbf801f5411185aa18dc7eaae8c8edad9746 Mon Sep 17 00:00:00 2001 From: James Watkins-Harvey Date: Mon, 20 Apr 2026 16:02:08 -0400 Subject: [PATCH] Fix NDE when multiple patches in a single WFT exceed SA's 2048-byte limit --- .../workflow/machines/workflow_machines.rs | 4 +- .../integ_tests/workflow_tests/patches.rs | 75 ++++++++++++++++++- 2 files changed, 77 insertions(+), 2 deletions(-) diff --git a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs index 38704e2da..c5fceb231 100644 --- a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs +++ b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs @@ -1433,7 +1433,9 @@ impl WorkflowMachines { self.replaying, attrs.deprecated, encountered_entry.is_some(), - self.encountered_patch_markers.keys().map(|s| s.as_str()), + self.encountered_patch_markers + .iter() + .filter_map(|(k, ci)| ci.created_command.then_some(k.as_str())), self.observed_internal_flags.clone(), )?; let mkey = self.add_cmd_to_wf_task( diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/patches.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/patches.rs index 283e4d10c..5ea63f774 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/patches.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/patches.rs @@ -1,4 +1,6 @@ -use crate::common::{ActivationAssertionsInterceptor, CoreWfStarter, build_fake_sdk}; +use crate::common::{ + ActivationAssertionsInterceptor, CoreWfStarter, WorkflowHandleExt, build_fake_sdk, +}; use std::{ collections::{HashSet, VecDeque, hash_map::RandomState}, sync::{ @@ -28,6 +30,7 @@ use temporalio_common::{ history::v1::{ ActivityTaskCompletedEventAttributes, ActivityTaskScheduledEventAttributes, ActivityTaskStartedEventAttributes, TimerFiredEventAttributes, + history_event::Attributes as EventAttributes, }, }, }, @@ -874,3 +877,73 @@ async fn many_patches_combine_in_search_attrib_update(#[case] num_patches: usize worker.register_workflow_with_factory(move || ManyPatchesWf { num_patches }); worker.run().await.unwrap(); } + +const MANY_PATCHES_IN_ONE_WFT_COUNT: usize = 200; + +#[workflow] +#[derive(Default)] +struct ManyPatchesInOneWftWf; + +#[workflow_methods] +impl ManyPatchesInOneWftWf { + #[run(name = DEFAULT_WORKFLOW_TYPE)] + async fn run(ctx: &mut WorkflowContext) -> WorkflowResult<()> { + for i in 1..=MANY_PATCHES_IN_ONE_WFT_COUNT { + let _ = ctx.patched(&format!("patch-{i}")); + } + ctx.timer(Duration::from_millis(1)).await; + Ok(()) + } +} + +// The main difference with many_patches_combine_in_search_attrib_update are that +// this one creates multiple patches in a single WFT, rather than spread them out +// over multiple WFTs. See https://github.com/temporalio/sdk-core/issues/1223. +#[tokio::test] +async fn patch_marker_size_overflow_replay_is_deterministic() { + let wf_name = "patch_marker_size_overflow_replay_is_deterministic"; + let mut starter = CoreWfStarter::new(wf_name); + starter.sdk_config.task_types = WorkerTaskTypes::workflow_only(); + let mut worker = starter.worker().await; + worker.register_workflow::(); + + let task_queue = starter.get_task_queue().to_owned(); + let handle = worker + .submit_workflow( + ManyPatchesInOneWftWf::run, + (), + WorkflowStartOptions::new(task_queue, wf_name.to_owned()).build(), + ) + .await + .unwrap(); + worker.run_until_done().await.unwrap(); + + // Confirm that the original execution did in fact hit the size limit: the last upsert SA + // event in history must contain fewer than the total number of patches issued by the workflow. + let history = handle.fetch_history(Default::default()).await.unwrap(); + let last_upsert_patches = history + .events() + .iter() + .rev() + .find_map(|e| match &e.attributes { + Some(EventAttributes::UpsertWorkflowSearchAttributesEventAttributes(a)) => a + .search_attributes + .as_ref() + .and_then(|sa| sa.indexed_fields.get(VERSION_SEARCH_ATTR_KEY)) + .map(|p| HashSet::::from_json_payload(p).unwrap()), + _ => None, + }) + .expect("history should contain at least one UpsertWorkflowSearchAttributes event"); + assert!( + last_upsert_patches.len() < MANY_PATCHES_IN_ONE_WFT_COUNT, + "expected the last upsert SA event to be missing patches due to size overflow, \ + but it contained all {MANY_PATCHES_IN_ONE_WFT_COUNT} of them", + ); + + // Replay the workflow from the fetched history. This must succeed: the SDK must produce the + // same sequence of upsert SA commands during replay as it did during the original execution. + handle + .fetch_history_and_replay(worker.inner_mut()) + .await + .unwrap(); +}