Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1433,7 +1433,9 @@ impl WorkflowMachines {
self.replaying,
attrs.deprecated,
encountered_entry.is_some(),
self.encountered_patch_markers.keys().map(|s| s.as_str()),
self.encountered_patch_markers
.iter()
.filter_map(|(k, ci)| ci.created_command.then_some(k.as_str())),
self.observed_internal_flags.clone(),
)?;
let mkey = self.add_cmd_to_wf_task(
Expand Down
75 changes: 74 additions & 1 deletion crates/sdk-core/tests/integ_tests/workflow_tests/patches.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::common::{ActivationAssertionsInterceptor, CoreWfStarter, build_fake_sdk};
use crate::common::{
ActivationAssertionsInterceptor, CoreWfStarter, WorkflowHandleExt, build_fake_sdk,
};
use std::{
collections::{HashSet, VecDeque, hash_map::RandomState},
sync::{
Expand Down Expand Up @@ -28,6 +30,7 @@ use temporalio_common::{
history::v1::{
ActivityTaskCompletedEventAttributes, ActivityTaskScheduledEventAttributes,
ActivityTaskStartedEventAttributes, TimerFiredEventAttributes,
history_event::Attributes as EventAttributes,
},
},
},
Expand Down Expand Up @@ -877,3 +880,73 @@ async fn many_patches_combine_in_search_attrib_update(#[case] num_patches: usize
worker.register_workflow_with_factory(move || ManyPatchesWf { num_patches });
worker.run().await.unwrap();
}

const MANY_PATCHES_IN_ONE_WFT_COUNT: usize = 200;

#[workflow]
#[derive(Default)]
struct ManyPatchesInOneWftWf;

#[workflow_methods]
impl ManyPatchesInOneWftWf {
#[run(name = DEFAULT_WORKFLOW_TYPE)]
async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<()> {
for i in 1..=MANY_PATCHES_IN_ONE_WFT_COUNT {
let _ = ctx.patched(&format!("patch-{i}"));
}
ctx.timer(Duration::from_millis(1)).await;
Ok(())
}
}

// The main difference with many_patches_combine_in_search_attrib_update are that
// this one creates multiple patches in a single WFT, rather than spread them out
// over multiple WFTs. See https://github.com/temporalio/sdk-core/issues/1223.
#[tokio::test]
async fn patch_marker_size_overflow_replay_is_deterministic() {
let wf_name = "patch_marker_size_overflow_replay_is_deterministic";
let mut starter = CoreWfStarter::new(wf_name);
starter.sdk_config.task_types = WorkerTaskTypes::workflow_only();
let mut worker = starter.worker().await;
worker.register_workflow::<ManyPatchesInOneWftWf>();

let task_queue = starter.get_task_queue().to_owned();
let handle = worker
.submit_workflow(
ManyPatchesInOneWftWf::run,
(),
WorkflowStartOptions::new(task_queue, wf_name.to_owned()).build(),
)
.await
.unwrap();
worker.run_until_done().await.unwrap();

// Confirm that the original execution did in fact hit the size limit: the last upsert SA
// event in history must contain fewer than the total number of patches issued by the workflow.
let history = handle.fetch_history(Default::default()).await.unwrap();
let last_upsert_patches = history
.events()
.iter()
.rev()
.find_map(|e| match &e.attributes {
Some(EventAttributes::UpsertWorkflowSearchAttributesEventAttributes(a)) => a
.search_attributes
.as_ref()
.and_then(|sa| sa.indexed_fields.get(VERSION_SEARCH_ATTR_KEY))
.map(|p| HashSet::<String, RandomState>::from_json_payload(p).unwrap()),
_ => None,
})
.expect("history should contain at least one UpsertWorkflowSearchAttributes event");
assert!(
last_upsert_patches.len() < MANY_PATCHES_IN_ONE_WFT_COUNT,
"expected the last upsert SA event to be missing patches due to size overflow, \
but it contained all {MANY_PATCHES_IN_ONE_WFT_COUNT} of them",
);

// Replay the workflow from the fetched history. This must succeed: the SDK must produce the
// same sequence of upsert SA commands during replay as it did during the original execution.
handle
.fetch_history_and_replay(worker.inner_mut())
.await
.unwrap();
}
Loading