Skip to content

Commit 1fe602d

Browse files
committed
Don't bother with balancer when not using sticky queues
1 parent 37f8923 commit 1fe602d

2 files changed

Lines changed: 26 additions & 22 deletions

File tree

core/src/pollers/poll_buffer.rs

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ pub(crate) struct LongPollBuffer<T, SK: SlotKind> {
5353
}
5454

5555
pub(crate) struct WorkflowTaskOptions {
56-
pub(crate) wft_poller_shared: Arc<WFTPollerShared>,
56+
pub(crate) wft_poller_shared: Option<Arc<WFTPollerShared>>,
5757
}
5858

5959
pub(crate) struct ActivityTaskOptions {
@@ -77,27 +77,27 @@ impl LongPollBuffer<PollWorkflowTaskQueueResponse, WorkflowSlotKind> {
7777
) -> Self {
7878
let is_sticky = sticky_queue.is_some();
7979
let poll_scaler = PollScaler::new(poller_behavior, num_pollers_handler, shutdown.clone());
80-
if is_sticky {
81-
options
82-
.wft_poller_shared
83-
.set_sticky_active(poll_scaler.active_rx.clone());
84-
} else {
85-
options
86-
.wft_poller_shared
87-
.set_non_sticky_active(poll_scaler.active_rx.clone());
88-
};
89-
let shared = options.wft_poller_shared.clone();
90-
let pre_permit_delay = Some(move || {
91-
let shared = shared.clone();
92-
async move {
93-
shared.wait_if_needed(is_sticky).await;
80+
if let Some(wftps) = options.wft_poller_shared.as_ref() {
81+
if is_sticky {
82+
wftps.set_sticky_active(poll_scaler.active_rx.clone());
83+
} else {
84+
wftps.set_non_sticky_active(poll_scaler.active_rx.clone());
85+
};
86+
}
87+
let pre_permit_delay = options.wft_poller_shared.clone().map(|wftps| {
88+
move || {
89+
let shared = wftps.clone();
90+
async move {
91+
shared.wait_if_needed(is_sticky).await;
92+
}
9493
}
9594
});
96-
let post_poll_fn = Some(move |t: &PollWorkflowTaskQueueResponse| {
97-
if is_sticky {
98-
options
99-
.wft_poller_shared
100-
.record_sticky_backlog(t.backlog_count_hint as usize)
95+
96+
let post_poll_fn = options.wft_poller_shared.clone().map(|wftps| {
97+
move |t: &PollWorkflowTaskQueueResponse| {
98+
if is_sticky {
99+
wftps.record_sticky_backlog(t.backlog_count_hint as usize)
100+
}
101101
}
102102
});
103103
let no_retry = if matches!(poller_behavior, PollerBehavior::Autoscaling { .. }) {
@@ -724,7 +724,7 @@ mod tests {
724724
CancellationToken::new(),
725725
None::<fn(usize)>,
726726
WorkflowTaskOptions {
727-
wft_poller_shared: Arc::new(WFTPollerShared::new()),
727+
wft_poller_shared: Some(Arc::new(WFTPollerShared::new())),
728728
},
729729
);
730730

core/src/worker/workflow/wft_poller.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,11 @@ pub(crate) fn make_wft_poller(
3434
> + Sized
3535
+ 'static {
3636
let wft_metrics = metrics.with_new_attrs([workflow_poller()]);
37-
let wft_poller_shared = Arc::new(WFTPollerShared::new());
37+
let wft_poller_shared = if sticky_queue_name.is_some() {
38+
Some(Arc::new(WFTPollerShared::new()))
39+
} else {
40+
None
41+
};
3842
let wf_task_poll_buffer = LongPollBuffer::new_workflow_task(
3943
client.clone(),
4044
config.task_queue.clone(),

0 commit comments

Comments
 (0)