Skip to content

Commit 61d4cbf

Browse files
committed
Fix NDE when multiple patches in a single WFT exceed SA's 2048-byte limit
1 parent 1e67b23 commit 61d4cbf

2 files changed

Lines changed: 77 additions & 2 deletions

File tree

crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1433,7 +1433,9 @@ impl WorkflowMachines {
14331433
self.replaying,
14341434
attrs.deprecated,
14351435
encountered_entry.is_some(),
1436-
self.encountered_patch_markers.keys().map(|s| s.as_str()),
1436+
self.encountered_patch_markers
1437+
.iter()
1438+
.filter_map(|(k, ci)| ci.created_command.then_some(k.as_str())),
14371439
self.observed_internal_flags.clone(),
14381440
)?;
14391441
let mkey = self.add_cmd_to_wf_task(

crates/sdk-core/tests/integ_tests/workflow_tests/patches.rs

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use crate::common::{ActivationAssertionsInterceptor, CoreWfStarter, build_fake_sdk};
1+
use crate::common::{
2+
ActivationAssertionsInterceptor, CoreWfStarter, WorkflowHandleExt, build_fake_sdk,
3+
};
24
use std::{
35
collections::{HashSet, VecDeque, hash_map::RandomState},
46
sync::{
@@ -28,6 +30,7 @@ use temporalio_common::{
2830
history::v1::{
2931
ActivityTaskCompletedEventAttributes, ActivityTaskScheduledEventAttributes,
3032
ActivityTaskStartedEventAttributes, TimerFiredEventAttributes,
33+
history_event::Attributes as EventAttributes,
3134
},
3235
},
3336
},
@@ -874,3 +877,73 @@ async fn many_patches_combine_in_search_attrib_update(#[case] num_patches: usize
874877
worker.register_workflow_with_factory(move || ManyPatchesWf { num_patches });
875878
worker.run().await.unwrap();
876879
}
880+
881+
const MANY_PATCHES_IN_ONE_WFT_COUNT: usize = 200;
882+
883+
#[workflow]
884+
#[derive(Default)]
885+
struct ManyPatchesInOneWftWf;
886+
887+
#[workflow_methods]
888+
impl ManyPatchesInOneWftWf {
889+
#[run(name = DEFAULT_WORKFLOW_TYPE)]
890+
async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<()> {
891+
for i in 1..=MANY_PATCHES_IN_ONE_WFT_COUNT {
892+
let _ = ctx.patched(&format!("patch-{i}"));
893+
}
894+
ctx.timer(Duration::from_millis(1)).await;
895+
Ok(())
896+
}
897+
}
898+
899+
// The main difference with many_patches_combine_in_search_attrib_update are that
900+
// this one creates multiple patches in a single WFT, rather than spread them out
901+
// over multiple WFTs. See https://github.com/temporalio/sdk-core/issues/1223.
902+
#[tokio::test]
903+
async fn patch_marker_size_overflow_replay_is_deterministic() {
904+
let wf_name = "patch_marker_size_overflow_replay_is_deterministic";
905+
let mut starter = CoreWfStarter::new(wf_name);
906+
starter.sdk_config.task_types = WorkerTaskTypes::workflow_only();
907+
let mut worker = starter.worker().await;
908+
worker.register_workflow::<ManyPatchesInOneWftWf>();
909+
910+
let task_queue = starter.get_task_queue().to_owned();
911+
let handle = worker
912+
.submit_workflow(
913+
ManyPatchesInOneWftWf::run,
914+
(),
915+
WorkflowStartOptions::new(task_queue, wf_name.to_owned()).build(),
916+
)
917+
.await
918+
.unwrap();
919+
worker.run_until_done().await.unwrap();
920+
921+
// Confirm that the original execution did in fact hit the size limit: the last upsert SA
922+
// event in history must contain fewer than the total number of patches issued by the workflow.
923+
let history = handle.fetch_history(Default::default()).await.unwrap();
924+
let last_upsert_patches = history
925+
.events()
926+
.iter()
927+
.rev()
928+
.find_map(|e| match &e.attributes {
929+
Some(EventAttributes::UpsertWorkflowSearchAttributesEventAttributes(a)) => a
930+
.search_attributes
931+
.as_ref()
932+
.and_then(|sa| sa.indexed_fields.get(VERSION_SEARCH_ATTR_KEY))
933+
.map(|p| HashSet::<String, RandomState>::from_json_payload(p).unwrap()),
934+
_ => None,
935+
})
936+
.expect("history should contain at least one UpsertWorkflowSearchAttributes event");
937+
assert!(
938+
last_upsert_patches.len() < MANY_PATCHES_IN_ONE_WFT_COUNT,
939+
"expected the last upsert SA event to be missing patches due to size overflow, \
940+
but it contained all {MANY_PATCHES_IN_ONE_WFT_COUNT} of them",
941+
);
942+
943+
// Replay the workflow from the fetched history. This must succeed: the SDK must produce the
944+
// same sequence of upsert SA commands during replay as it did during the original execution.
945+
handle
946+
.fetch_history_and_replay(worker.inner_mut())
947+
.await
948+
.unwrap();
949+
}

0 commit comments

Comments
 (0)