Skip to content

Commit dc127f5

Browse files
SushisourceMaxim Fateev
andauthored
Fix wait_condition to wake registered wakers on state_mut (temporalio#1118)
Co-authored-by: Maxim Fateev <mfateev@temporal.io>
1 parent c781a61 commit dc127f5

2 files changed

Lines changed: 22 additions & 4 deletions

File tree

crates/sdk/src/workflow_context.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::{
2424
atomic::{AtomicBool, Ordering},
2525
mpsc::{Receiver, Sender},
2626
},
27-
task::Poll,
27+
task::{Poll, Waker},
2828
time::{Duration, SystemTime},
2929
};
3030
use temporalio_common::{
@@ -127,13 +127,18 @@ pub struct WorkflowContext<W> {
127127
sync: SyncWorkflowContext<W>,
128128
/// The workflow instance
129129
workflow_state: Rc<RefCell<W>>,
130+
/// Wakers registered by `wait_condition` futures. Drained and woken on
131+
/// every `state_mut` call so that waker-based combinators (e.g.
132+
/// `FuturesOrdered`) re-poll the condition after state changes.
133+
condition_wakers: Rc<RefCell<Vec<Waker>>>,
130134
}
131135

132136
impl<W> Clone for WorkflowContext<W> {
133137
fn clone(&self) -> Self {
134138
Self {
135139
sync: self.sync.clone(),
136140
workflow_state: self.workflow_state.clone(),
141+
condition_wakers: self.condition_wakers.clone(),
137142
}
138143
}
139144
}
@@ -791,6 +796,7 @@ impl<W> WorkflowContext<W> {
791796
_phantom: PhantomData,
792797
},
793798
workflow_state,
799+
condition_wakers: Rc::new(RefCell::new(Vec::new())),
794800
}
795801
}
796802

@@ -803,6 +809,7 @@ impl<W> WorkflowContext<W> {
803809
_phantom: PhantomData,
804810
},
805811
workflow_state: self.workflow_state.clone(),
812+
condition_wakers: self.condition_wakers.clone(),
806813
}
807814
}
808815

@@ -979,8 +986,16 @@ impl<W> WorkflowContext<W> {
979986
///
980987
/// The borrow is scoped to the closure and cannot escape, preventing
981988
/// borrows from being held across await points.
989+
///
990+
/// After the mutation, all wakers registered by pending `wait_condition`
991+
/// futures are woken so that waker-based combinators (e.g.
992+
/// `FuturesOrdered`) re-poll them on the next pass.
982993
pub fn state_mut<R>(&self, f: impl FnOnce(&mut W) -> R) -> R {
983-
f(&mut *self.workflow_state.borrow_mut())
994+
let result = f(&mut *self.workflow_state.borrow_mut());
995+
for waker in self.condition_wakers.borrow_mut().drain(..) {
996+
waker.wake();
997+
}
998+
result
984999
}
9851000

9861001
/// Wait for some condition on workflow state to become true, yielding the workflow if not.
@@ -991,10 +1006,11 @@ impl<W> WorkflowContext<W> {
9911006
&'a self,
9921007
mut condition: impl FnMut(&W) -> bool + 'a,
9931008
) -> impl Future<Output = ()> + 'a {
994-
future::poll_fn(move |_cx: &mut Context<'_>| {
1009+
future::poll_fn(move |cx: &mut Context<'_>| {
9951010
if condition(&*self.workflow_state.borrow()) {
9961011
Poll::Ready(())
9971012
} else {
1013+
self.condition_wakers.borrow_mut().push(cx.waker().clone());
9981014
Poll::Pending
9991015
}
10001016
})

crates/sdk/src/workflows.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
//! Example usage:
77
//! ```
88
//! use temporalio_macros::{workflow, workflow_methods};
9-
//! use temporalio_sdk::{SyncWorkflowContext, WorkflowContext, WorkflowContextView, WorkflowResult};
9+
//! use temporalio_sdk::{
10+
//! SyncWorkflowContext, WorkflowContext, WorkflowContextView, WorkflowResult,
11+
//! };
1012
//!
1113
//! #[workflow]
1214
//! pub struct MyWorkflow {

0 commit comments

Comments
 (0)