Skip to content

Commit 5f34f30

Browse files
committed
Don't attempt balance in autoscale mode
1 parent fa8d470 commit 5f34f30

2 files changed

Lines changed: 21 additions & 19 deletions

File tree

core/src/pollers/poll_buffer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -724,7 +724,7 @@ mod tests {
724724
CancellationToken::new(),
725725
None::<fn(usize)>,
726726
WorkflowTaskOptions {
727-
wft_poller_shared: Some(Arc::new(WFTPollerShared::new())),
727+
wft_poller_shared: Some(Arc::new(WFTPollerShared::new(false))),
728728
},
729729
);
730730

core/src/worker/workflow/wft_poller.rs

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,19 @@ pub(crate) fn make_wft_poller(
3131
> + Sized
3232
+ 'static {
3333
let wft_metrics = metrics.with_new_attrs([workflow_poller()]);
34+
let poller_behavior = wft_poller_behavior(config, false);
3435
let wft_poller_shared = if sticky_queue_name.is_some() {
35-
Some(Arc::new(WFTPollerShared::new()))
36+
Some(Arc::new(WFTPollerShared::new(
37+
poller_behavior.is_autoscaling(),
38+
)))
3639
} else {
3740
None
3841
};
3942
let wf_task_poll_buffer = LongPollBuffer::new_workflow_task(
4043
client.clone(),
4144
config.task_queue.clone(),
4245
None,
43-
wft_poller_behavior(config, false),
46+
poller_behavior,
4447
wft_slots.clone(),
4548
shutdown_token.child_token(),
4649
Some(move |np| {
@@ -78,14 +81,16 @@ pub(crate) struct WFTPollerShared {
7881
last_seen_sticky_backlog: (watch::Receiver<usize>, watch::Sender<usize>),
7982
sticky_active: OnceLock<watch::Receiver<usize>>,
8083
non_sticky_active: OnceLock<watch::Receiver<usize>>,
84+
using_autoscaling: bool,
8185
}
8286
impl WFTPollerShared {
83-
pub(crate) fn new() -> Self {
87+
pub(crate) fn new(using_autoscaling: bool) -> Self {
8488
let (tx, rx) = watch::channel(0);
8589
Self {
8690
last_seen_sticky_backlog: (rx, tx),
8791
sticky_active: OnceLock::new(),
8892
non_sticky_active: OnceLock::new(),
93+
using_autoscaling,
8994
}
9095
}
9196
pub(crate) fn set_sticky_active(&self, rx: watch::Receiver<usize>) {
@@ -99,23 +104,20 @@ impl WFTPollerShared {
99104
pub(crate) async fn wait_if_needed(&self, is_sticky: bool) {
100105
// If there's a sticky backlog, prioritize it.
101106
if !is_sticky {
102-
let backlog = *self.last_seen_sticky_backlog.0.borrow();
103-
if backlog >= 1 {
104-
let _ = self
105-
.last_seen_sticky_backlog
106-
.0
107-
.clone()
108-
.wait_for(|v| *v == 0)
109-
.await;
110-
}
107+
let _ = self
108+
.last_seen_sticky_backlog
109+
.0
110+
.clone()
111+
.wait_for(|v| *v == 0)
112+
.await;
111113
}
112114

113-
// If there's no sticky backlog, balance poller counts. This logic allows the poller
114-
// to proceed if it has the same or fewer pollers as it's opposite. There is a preference
115-
// for the sticky poller when counts are equal. This does not mean we always have equal
116-
// numbers of pollers, as later on the scaler will also prevent polling based on the scaling
117-
// information provided independently by the sticky/nonsticky queues.
118-
if *self.last_seen_sticky_backlog.0.borrow() == 0 {
115+
// Unless autoscaling, if there's no sticky backlog, balance poller counts. This logic
116+
// allows the poller to proceed if it has the same or fewer pollers as it's opposite. There
117+
// is a preference for the sticky poller when counts are equal. This does not mean we always
118+
// have equal numbers of pollers, as later on the scaler will also prevent polling based on
119+
// the scaling information provided independently by the sticky/nonsticky queues.
120+
if !self.using_autoscaling && *self.last_seen_sticky_backlog.0.borrow() == 0 {
119121
if let Some((sticky_active, non_sticky_active)) =
120122
self.sticky_active.get().zip(self.non_sticky_active.get())
121123
{

0 commit comments

Comments
 (0)