Skip to content

Commit d9e00e0

Browse files
committed
Just do the obvious thing with max slots
1 parent 5f34f30 commit d9e00e0

4 files changed

Lines changed: 50 additions & 17 deletions

File tree

core-api/src/worker.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,7 @@ pub enum PollerBehavior {
521521
/// requires a slot to be available before beginning polling.
522522
Autoscaling {
523523
/// At least this many poll calls will always be attempted (assuming slots are available).
524-
/// Cannot be less than two for workflow tasks, or one for other tasks.
524+
/// Cannot be zero.
525525
minimum: usize,
526526
/// At most this many poll calls will ever be open at once. Must be >= `minimum`.
527527
maximum: usize,

core/src/pollers/poll_buffer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -724,7 +724,7 @@ mod tests {
724724
CancellationToken::new(),
725725
None::<fn(usize)>,
726726
WorkflowTaskOptions {
727-
wft_poller_shared: Some(Arc::new(WFTPollerShared::new(false))),
727+
wft_poller_shared: Some(Arc::new(WFTPollerShared::new(Some(10)))),
728728
},
729729
);
730730

core/src/worker/workflow/wft_poller.rs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ pub(crate) fn make_wft_poller(
3434
let poller_behavior = wft_poller_behavior(config, false);
3535
let wft_poller_shared = if sticky_queue_name.is_some() {
3636
Some(Arc::new(WFTPollerShared::new(
37-
poller_behavior.is_autoscaling(),
37+
wft_slots.available_permits(),
3838
)))
3939
} else {
4040
None
@@ -81,16 +81,16 @@ pub(crate) struct WFTPollerShared {
8181
last_seen_sticky_backlog: (watch::Receiver<usize>, watch::Sender<usize>),
8282
sticky_active: OnceLock<watch::Receiver<usize>>,
8383
non_sticky_active: OnceLock<watch::Receiver<usize>>,
84-
using_autoscaling: bool,
84+
max_slots: Option<usize>,
8585
}
8686
impl WFTPollerShared {
87-
pub(crate) fn new(using_autoscaling: bool) -> Self {
87+
pub(crate) fn new(max_slots: Option<usize>) -> Self {
8888
let (tx, rx) = watch::channel(0);
8989
Self {
9090
last_seen_sticky_backlog: (rx, tx),
9191
sticky_active: OnceLock::new(),
9292
non_sticky_active: OnceLock::new(),
93-
using_autoscaling,
93+
max_slots,
9494
}
9595
}
9696
pub(crate) fn set_sticky_active(&self, rx: watch::Receiver<usize>) {
@@ -112,12 +112,10 @@ impl WFTPollerShared {
112112
.await;
113113
}
114114

115-
// Unless autoscaling, if there's no sticky backlog, balance poller counts. This logic
116-
// allows the poller to proceed if it has the same or fewer pollers as it's opposite. There
117-
// is a preference for the sticky poller when counts are equal. This does not mean we always
118-
// have equal numbers of pollers, as later on the scaler will also prevent polling based on
119-
// the scaling information provided independently by the sticky/nonsticky queues.
120-
if !self.using_autoscaling && *self.last_seen_sticky_backlog.0.borrow() == 0 {
115+
// We need to make sure there's at least one poller of both kinds available. So, we check
116+
// that we won't end up using every available permit with one kind of poller. In practice
117+
// this is only ever likely to be an issue with very small numbers of slots.
118+
if let Some(max_slots) = self.max_slots {
121119
if let Some((sticky_active, non_sticky_active)) =
122120
self.sticky_active.get().zip(self.non_sticky_active.get())
123121
{
@@ -126,8 +124,15 @@ impl WFTPollerShared {
126124
loop {
127125
let num_sticky_active = *sticky_active.borrow_and_update();
128126
let num_non_sticky_active = *non_sticky_active.borrow_and_update();
129-
if (is_sticky && num_sticky_active <= num_non_sticky_active)
130-
|| (!is_sticky && (num_non_sticky_active < num_sticky_active))
127+
let would_exceed_max_slots =
128+
(num_sticky_active + num_non_sticky_active) > max_slots;
129+
let both_are_zero = num_sticky_active == 0 && num_non_sticky_active == 0;
130+
if !is_sticky && both_are_zero {
131+
break;
132+
}
133+
if would_exceed_max_slots
134+
&& ((!is_sticky && (num_sticky_active > 0))
135+
|| (is_sticky && (num_non_sticky_active > 0)))
131136
{
132137
break;
133138
}
@@ -139,6 +144,7 @@ impl WFTPollerShared {
139144
}
140145
}
141146
}
147+
142148
pub(crate) fn record_sticky_backlog(&self, v: usize) {
143149
let _ = self.last_seen_sticky_backlog.1.send(v);
144150
}

tests/integ_tests/polling_tests.rs

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -212,16 +212,29 @@ async fn switching_worker_client_changes_poll() {
212212
server2.shutdown().await.unwrap();
213213
}
214214

215+
#[rstest::rstest]
215216
#[tokio::test]
216-
async fn small_workflow_slots_and_pollers() {
217+
async fn small_workflow_slots_and_pollers(#[values(false, true)] use_autoscaling: bool) {
217218
let wf_name = "only_one_workflow_slot_and_two_pollers";
218219
let mut starter = CoreWfStarter::new(wf_name);
220+
if use_autoscaling {
221+
starter
222+
.worker_config
223+
.workflow_task_poller_behavior(PollerBehavior::Autoscaling {
224+
minimum: 1,
225+
maximum: 5,
226+
initial: 1,
227+
});
228+
} else {
229+
starter
230+
.worker_config
231+
.workflow_task_poller_behavior(PollerBehavior::SimpleMaximum(2));
232+
}
219233
starter
220234
.worker_config
221235
.max_outstanding_workflow_tasks(2_usize)
222236
.max_outstanding_local_activities(1_usize)
223237
.activity_task_poller_behavior(PollerBehavior::SimpleMaximum(1))
224-
.workflow_task_poller_behavior(PollerBehavior::SimpleMaximum(2))
225238
.max_outstanding_activities(1_usize);
226239
let mut worker = starter.worker().await;
227240
worker.register_wf(wf_name.to_owned(), |ctx: WfContext| async move {
@@ -246,9 +259,10 @@ async fn small_workflow_slots_and_pollers() {
246259
)
247260
.await
248261
.unwrap();
262+
let wf2id = format!("{}-2", starter.get_task_queue());
249263
worker
250264
.submit_wf(
251-
format!("{}-2", starter.get_task_queue()),
265+
wf2id.clone(),
252266
wf_name.to_owned(),
253267
vec![],
254268
WorkflowOptions::default(),
@@ -264,4 +278,17 @@ async fn small_workflow_slots_and_pollers() {
264278
.iter()
265279
.any(|e| e.event_type() == EventType::WorkflowTaskTimedOut);
266280
assert!(!any_task_timeouts);
281+
let history = starter
282+
.get_client()
283+
.await
284+
.get_workflow_execution_history(wf2id, None, vec![])
285+
.await
286+
.unwrap()
287+
.history
288+
.unwrap();
289+
let any_task_timeouts = history
290+
.events
291+
.iter()
292+
.any(|e| e.event_type() == EventType::WorkflowTaskTimedOut);
293+
assert!(!any_task_timeouts);
267294
}

0 commit comments

Comments
 (0)