Skip to content

Commit d514f44

Browse files
committed
Remove unneeded bits now that minimum 2-size is enforced
1 parent 24845a0 commit d514f44

1 file changed

Lines changed: 4 additions & 20 deletions

File tree

core/src/worker/workflow/wft_poller.rs

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,10 @@ use crate::{
77
worker::{client::WorkerClient, wft_poller_behavior},
88
};
99
use futures_util::{Stream, stream};
10-
use std::sync::{
11-
Arc, OnceLock,
12-
atomic::{AtomicBool, Ordering},
13-
};
10+
use std::sync::{Arc, OnceLock};
1411
use temporal_sdk_core_api::worker::{WorkerConfig, WorkflowSlotKind};
1512
use temporal_sdk_core_protos::temporal::api::workflowservice::v1::PollWorkflowTaskQueueResponse;
16-
use tokio::sync::{Notify, watch};
13+
use tokio::sync::watch;
1714
use tokio_util::sync::CancellationToken;
1815

1916
pub(crate) fn make_wft_poller(
@@ -81,8 +78,6 @@ pub(crate) struct WFTPollerShared {
8178
last_seen_sticky_backlog: (watch::Receiver<usize>, watch::Sender<usize>),
8279
sticky_active: OnceLock<watch::Receiver<usize>>,
8380
non_sticky_active: OnceLock<watch::Receiver<usize>>,
84-
wait_for_first_nonsticky_poll: Notify,
85-
have_done_first_poll: AtomicBool,
8681
}
8782
impl WFTPollerShared {
8883
pub(crate) fn new() -> Self {
@@ -91,8 +86,6 @@ impl WFTPollerShared {
9186
last_seen_sticky_backlog: (rx, tx),
9287
sticky_active: OnceLock::new(),
9388
non_sticky_active: OnceLock::new(),
94-
wait_for_first_nonsticky_poll: Notify::new(),
95-
have_done_first_poll: AtomicBool::new(false),
9689
}
9790
}
9891
pub(crate) fn set_sticky_active(&self, rx: watch::Receiver<usize>) {
@@ -104,10 +97,6 @@ impl WFTPollerShared {
10497
/// Makes either the sticky or non-sticky poller wait pre-permit-acquisition so that we can
10598
/// balance which kind of queue we poll appropriately.
10699
pub(crate) async fn wait_if_needed(&self, is_sticky: bool) {
107-
// Sticky shouldn't start polling until after the first non-sticky poll has been allowed
108-
if is_sticky && !self.have_done_first_poll.load(Ordering::Relaxed) {
109-
self.wait_for_first_nonsticky_poll.notified().await;
110-
}
111100
// If there's a sticky backlog, prioritize it.
112101
if !is_sticky {
113102
let backlog = *self.last_seen_sticky_backlog.0.borrow();
@@ -135,14 +124,9 @@ impl WFTPollerShared {
135124
loop {
136125
let num_sticky_active = *sticky_active.borrow_and_update();
137126
let num_non_sticky_active = *non_sticky_active.borrow_and_update();
138-
if is_sticky && num_sticky_active <= num_non_sticky_active {
139-
break;
140-
} else if !is_sticky
141-
&& (num_non_sticky_active < num_sticky_active
142-
|| !self.have_done_first_poll.load(Ordering::Relaxed))
127+
if (is_sticky && num_sticky_active <= num_non_sticky_active)
128+
|| (!is_sticky && (num_non_sticky_active < num_sticky_active))
143129
{
144-
self.have_done_first_poll.store(true, Ordering::Relaxed);
145-
self.wait_for_first_nonsticky_poll.notify_waiters();
146130
break;
147131
}
148132
tokio::select! {

0 commit comments

Comments
 (0)