Skip to content

Commit db9ecc1

Browse files
committed
Add wakeup count test
1 parent 02cbae3 commit db9ecc1

2 files changed

Lines changed: 51 additions & 2 deletions

File tree

sdk/src/workflow_context.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,7 @@ impl WfContext {
418418
}
419419

420420
/// Wait for some condition to become true, yielding the workflow if it is not.
421-
pub fn wait_condition(&self, condition: impl Fn() -> bool) -> impl Future<Output = ()> {
421+
pub fn wait_condition(&self, mut condition: impl FnMut() -> bool) -> impl Future<Output = ()> {
422422
future::poll_fn(move |_cx: &mut Context<'_>| {
423423
if condition() {
424424
Poll::Ready(())

tests/integ_tests/workflow_tests/local_activities.rs

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use rstest::Context;
55
use std::{
66
sync::{
77
Arc,
8-
atomic::{AtomicU8, AtomicUsize, Ordering},
8+
atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering},
99
},
1010
time::Duration,
1111
};
@@ -859,3 +859,52 @@ async fn long_local_activity_with_update(
859859
inner_worker.set_worker_interceptor(FailOnNondeterminismInterceptor {});
860860
inner_worker.run().await.unwrap();
861861
}
862+
863+
#[tokio::test]
864+
async fn local_activity_with_heartbeat_only_causes_one_wakeup() {
865+
let wf_name = "local_activity_with_heartbeat_only_causes_one_wakeup";
866+
let mut starter = CoreWfStarter::new(wf_name);
867+
starter.workflow_options.task_timeout = Some(Duration::from_secs(1));
868+
let mut worker = starter.worker().await;
869+
870+
worker.register_wf(wf_name.to_owned(), move |ctx: WfContext| async move {
871+
let mut wakeup_counter = 1;
872+
let la_resolved = AtomicBool::new(false);
873+
tokio::join!(
874+
async {
875+
ctx.local_activity(LocalActivityOptions {
876+
activity_type: "delay".to_string(),
877+
input: "hi".as_json_payload().expect("serializes fine"),
878+
..Default::default()
879+
})
880+
.await;
881+
la_resolved.store(true, Ordering::Relaxed);
882+
},
883+
async {
884+
ctx.wait_condition(|| {
885+
wakeup_counter += 1;
886+
la_resolved.load(Ordering::Relaxed)
887+
})
888+
.await;
889+
}
890+
);
891+
Ok(().into())
892+
});
893+
worker.register_activity("delay", |_: ActContext, _: String| async {
894+
tokio::time::sleep(Duration::from_secs(6)).await;
895+
Ok(())
896+
});
897+
898+
let handle = starter.start_with_worker(wf_name, &mut worker).await;
899+
worker.run_until_done().await.unwrap();
900+
let res = handle
901+
.get_workflow_result(Default::default())
902+
.await
903+
.unwrap()
904+
.unwrap_success();
905+
let replay_res = handle
906+
.fetch_history_and_replay(worker.inner_mut())
907+
.await
908+
.unwrap();
909+
assert_eq!(res[0], replay_res.unwrap());
910+
}

0 commit comments

Comments
 (0)