@@ -31,12 +31,19 @@ pub(crate) fn make_wft_poller(
3131> + Sized
3232+ ' static {
3333 let wft_metrics = metrics. with_new_attrs ( [ workflow_poller ( ) ] ) ;
34- let wft_poller_shared = Arc :: new ( WFTPollerShared :: new ( ) ) ;
34+ let poller_behavior = wft_poller_behavior ( config, false ) ;
35+ let wft_poller_shared = if sticky_queue_name. is_some ( ) {
36+ Some ( Arc :: new ( WFTPollerShared :: new (
37+ wft_slots. available_permits ( ) ,
38+ ) ) )
39+ } else {
40+ None
41+ } ;
3542 let wf_task_poll_buffer = LongPollBuffer :: new_workflow_task (
3643 client. clone ( ) ,
3744 config. task_queue . clone ( ) ,
3845 None ,
39- wft_poller_behavior ( config , false ) ,
46+ poller_behavior ,
4047 wft_slots. clone ( ) ,
4148 shutdown_token. child_token ( ) ,
4249 Some ( move |np| {
@@ -74,14 +81,16 @@ pub(crate) struct WFTPollerShared {
7481 last_seen_sticky_backlog : ( watch:: Receiver < usize > , watch:: Sender < usize > ) ,
7582 sticky_active : OnceLock < watch:: Receiver < usize > > ,
7683 non_sticky_active : OnceLock < watch:: Receiver < usize > > ,
84+ max_slots : Option < usize > ,
7785}
7886impl WFTPollerShared {
79- pub ( crate ) fn new ( ) -> Self {
87+ pub ( crate ) fn new ( max_slots : Option < usize > ) -> Self {
8088 let ( tx, rx) = watch:: channel ( 0 ) ;
8189 Self {
8290 last_seen_sticky_backlog : ( rx, tx) ,
8391 sticky_active : OnceLock :: new ( ) ,
8492 non_sticky_active : OnceLock :: new ( ) ,
93+ max_slots,
8594 }
8695 }
8796 pub ( crate ) fn set_sticky_active ( & self , rx : watch:: Receiver < usize > ) {
@@ -95,36 +104,49 @@ impl WFTPollerShared {
95104 pub ( crate ) async fn wait_if_needed ( & self , is_sticky : bool ) {
96105 // If there's a sticky backlog, prioritize it.
97106 if !is_sticky {
98- let backlog = * self . last_seen_sticky_backlog . 0 . borrow ( ) ;
99- if backlog > 1 {
100- let _ = self
101- . last_seen_sticky_backlog
102- . 0
103- . clone ( )
104- . wait_for ( |v| * v <= 1 )
105- . await ;
106- }
107+ let _ = self
108+ . last_seen_sticky_backlog
109+ . 0
110+ . clone ( )
111+ . wait_for ( |v| * v == 0 )
112+ . await ;
107113 }
108114
109- // If there's no meaningful sticky backlog, balance poller counts
110- if * self . last_seen_sticky_backlog . 0 . borrow ( ) <= 1 {
115+ // We need to make sure there's at least one poller of both kinds available. So, we check
116+ // that we won't end up using every available permit with one kind of poller. In practice
117+ // this is only ever likely to be an issue with very small numbers of slots.
118+ if let Some ( max_slots) = self . max_slots {
111119 if let Some ( ( sticky_active, non_sticky_active) ) =
112120 self . sticky_active . get ( ) . zip ( self . non_sticky_active . get ( ) )
113121 {
114- if is_sticky {
115- let _ = sticky_active
116- . clone ( )
117- . wait_for ( |v| * v <= * non_sticky_active. borrow ( ) )
118- . await ;
119- } else {
120- let _ = non_sticky_active
121- . clone ( )
122- . wait_for ( |v| * v <= * sticky_active. borrow ( ) )
123- . await ;
122+ let mut sticky_active = sticky_active. clone ( ) ;
123+ let mut non_sticky_active = non_sticky_active. clone ( ) ;
124+ loop {
125+ let num_sticky_active = * sticky_active. borrow_and_update ( ) ;
126+ let num_non_sticky_active = * non_sticky_active. borrow_and_update ( ) ;
127+ let both_are_zero = num_sticky_active == 0 && num_non_sticky_active == 0 ;
128+ if both_are_zero {
129+ if !is_sticky {
130+ break ;
131+ }
132+ } else {
133+ let would_exceed_max_slots =
134+ ( num_sticky_active + num_non_sticky_active + 1 ) >= max_slots;
135+ let must_wait = would_exceed_max_slots
136+ && ( num_sticky_active == 0 || num_non_sticky_active == 0 ) ;
137+ if !must_wait {
138+ break ;
139+ }
140+ }
141+ tokio:: select! {
142+ _ = sticky_active. changed( ) => ( ) ,
143+ _ = non_sticky_active. changed( ) => ( ) ,
144+ }
124145 }
125146 }
126147 }
127148 }
149+
128150 pub ( crate ) fn record_sticky_backlog ( & self , v : usize ) {
129151 let _ = self . last_seen_sticky_backlog . 1 . send ( v) ;
130152 }
0 commit comments