Skip to content

Commit 703fe6b

Browse files
authored
Fix NDE when multiple patches in a single WFT exceed SA's 2048-byte limit (#1225)
1 parent 9171aca commit 703fe6b

2 files changed

Lines changed: 77 additions & 2 deletions

File tree

crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1433,7 +1433,9 @@ impl WorkflowMachines {
14331433
self.replaying,
14341434
attrs.deprecated,
14351435
encountered_entry.is_some(),
1436-
self.encountered_patch_markers.keys().map(|s| s.as_str()),
1436+
self.encountered_patch_markers
1437+
.iter()
1438+
.filter_map(|(k, ci)| ci.created_command.then_some(k.as_str())),
14371439
self.observed_internal_flags.clone(),
14381440
)?;
14391441
let mkey = self.add_cmd_to_wf_task(

crates/sdk-core/tests/integ_tests/workflow_tests/patches.rs

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use crate::common::{ActivationAssertionsInterceptor, CoreWfStarter, build_fake_sdk};
1+
use crate::common::{
2+
ActivationAssertionsInterceptor, CoreWfStarter, WorkflowHandleExt, build_fake_sdk,
3+
};
24
use std::{
35
collections::{HashSet, VecDeque, hash_map::RandomState},
46
sync::{
@@ -28,6 +30,7 @@ use temporalio_common::{
2830
history::v1::{
2931
ActivityTaskCompletedEventAttributes, ActivityTaskScheduledEventAttributes,
3032
ActivityTaskStartedEventAttributes, TimerFiredEventAttributes,
33+
history_event::Attributes as EventAttributes,
3134
},
3235
},
3336
},
@@ -877,3 +880,73 @@ async fn many_patches_combine_in_search_attrib_update(#[case] num_patches: usize
877880
worker.register_workflow_with_factory(move || ManyPatchesWf { num_patches });
878881
worker.run().await.unwrap();
879882
}
883+
884+
const MANY_PATCHES_IN_ONE_WFT_COUNT: usize = 200;
885+
886+
#[workflow]
887+
#[derive(Default)]
888+
struct ManyPatchesInOneWftWf;
889+
890+
#[workflow_methods]
891+
impl ManyPatchesInOneWftWf {
892+
#[run(name = DEFAULT_WORKFLOW_TYPE)]
893+
async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<()> {
894+
for i in 1..=MANY_PATCHES_IN_ONE_WFT_COUNT {
895+
let _ = ctx.patched(&format!("patch-{i}"));
896+
}
897+
ctx.timer(Duration::from_millis(1)).await;
898+
Ok(())
899+
}
900+
}
901+
902+
// The main difference with many_patches_combine_in_search_attrib_update are that
903+
// this one creates multiple patches in a single WFT, rather than spread them out
904+
// over multiple WFTs. See https://github.com/temporalio/sdk-core/issues/1223.
905+
#[tokio::test]
906+
async fn patch_marker_size_overflow_replay_is_deterministic() {
907+
let wf_name = "patch_marker_size_overflow_replay_is_deterministic";
908+
let mut starter = CoreWfStarter::new(wf_name);
909+
starter.sdk_config.task_types = WorkerTaskTypes::workflow_only();
910+
let mut worker = starter.worker().await;
911+
worker.register_workflow::<ManyPatchesInOneWftWf>();
912+
913+
let task_queue = starter.get_task_queue().to_owned();
914+
let handle = worker
915+
.submit_workflow(
916+
ManyPatchesInOneWftWf::run,
917+
(),
918+
WorkflowStartOptions::new(task_queue, wf_name.to_owned()).build(),
919+
)
920+
.await
921+
.unwrap();
922+
worker.run_until_done().await.unwrap();
923+
924+
// Confirm that the original execution did in fact hit the size limit: the last upsert SA
925+
// event in history must contain fewer than the total number of patches issued by the workflow.
926+
let history = handle.fetch_history(Default::default()).await.unwrap();
927+
let last_upsert_patches = history
928+
.events()
929+
.iter()
930+
.rev()
931+
.find_map(|e| match &e.attributes {
932+
Some(EventAttributes::UpsertWorkflowSearchAttributesEventAttributes(a)) => a
933+
.search_attributes
934+
.as_ref()
935+
.and_then(|sa| sa.indexed_fields.get(VERSION_SEARCH_ATTR_KEY))
936+
.map(|p| HashSet::<String, RandomState>::from_json_payload(p).unwrap()),
937+
_ => None,
938+
})
939+
.expect("history should contain at least one UpsertWorkflowSearchAttributes event");
940+
assert!(
941+
last_upsert_patches.len() < MANY_PATCHES_IN_ONE_WFT_COUNT,
942+
"expected the last upsert SA event to be missing patches due to size overflow, \
943+
but it contained all {MANY_PATCHES_IN_ONE_WFT_COUNT} of them",
944+
);
945+
946+
// Replay the workflow from the fetched history. This must succeed: the SDK must produce the
947+
// same sequence of upsert SA commands during replay as it did during the original execution.
948+
handle
949+
.fetch_history_and_replay(worker.inner_mut())
950+
.await
951+
.unwrap();
952+
}

0 commit comments

Comments
 (0)