@@ -34,7 +34,7 @@ pub(crate) fn make_wft_poller(
3434 let poller_behavior = wft_poller_behavior ( config, false ) ;
3535 let wft_poller_shared = if sticky_queue_name. is_some ( ) {
3636 Some ( Arc :: new ( WFTPollerShared :: new (
37- poller_behavior . is_autoscaling ( ) ,
37+ wft_slots . available_permits ( ) ,
3838 ) ) )
3939 } else {
4040 None
@@ -81,16 +81,16 @@ pub(crate) struct WFTPollerShared {
8181 last_seen_sticky_backlog : ( watch:: Receiver < usize > , watch:: Sender < usize > ) ,
8282 sticky_active : OnceLock < watch:: Receiver < usize > > ,
8383 non_sticky_active : OnceLock < watch:: Receiver < usize > > ,
84- using_autoscaling : bool ,
84+ max_slots : Option < usize > ,
8585}
8686impl WFTPollerShared {
87- pub ( crate ) fn new ( using_autoscaling : bool ) -> Self {
87+ pub ( crate ) fn new ( max_slots : Option < usize > ) -> Self {
8888 let ( tx, rx) = watch:: channel ( 0 ) ;
8989 Self {
9090 last_seen_sticky_backlog : ( rx, tx) ,
9191 sticky_active : OnceLock :: new ( ) ,
9292 non_sticky_active : OnceLock :: new ( ) ,
93- using_autoscaling ,
93+ max_slots ,
9494 }
9595 }
9696 pub ( crate ) fn set_sticky_active ( & self , rx : watch:: Receiver < usize > ) {
@@ -112,12 +112,10 @@ impl WFTPollerShared {
112112 . await ;
113113 }
114114
115- // Unless autoscaling, if there's no sticky backlog, balance poller counts. This logic
116- // allows the poller to proceed if it has the same or fewer pollers as it's opposite. There
117- // is a preference for the sticky poller when counts are equal. This does not mean we always
118- // have equal numbers of pollers, as later on the scaler will also prevent polling based on
119- // the scaling information provided independently by the sticky/nonsticky queues.
120- if !self . using_autoscaling && * self . last_seen_sticky_backlog . 0 . borrow ( ) == 0 {
115+ // We need to make sure there's at least one poller of both kinds available. So, we check
116+ // that we won't end up using every available permit with one kind of poller. In practice
117+ // this is only ever likely to be an issue with very small numbers of slots.
118+ if let Some ( max_slots) = self . max_slots {
121119 if let Some ( ( sticky_active, non_sticky_active) ) =
122120 self . sticky_active . get ( ) . zip ( self . non_sticky_active . get ( ) )
123121 {
@@ -126,10 +124,19 @@ impl WFTPollerShared {
126124 loop {
127125 let num_sticky_active = * sticky_active. borrow_and_update ( ) ;
128126 let num_non_sticky_active = * non_sticky_active. borrow_and_update ( ) ;
129- if ( is_sticky && num_sticky_active <= num_non_sticky_active)
130- || ( !is_sticky && ( num_non_sticky_active < num_sticky_active) )
131- {
132- break ;
127+ let both_are_zero = num_sticky_active == 0 && num_non_sticky_active == 0 ;
128+ if both_are_zero {
129+ if !is_sticky {
130+ break ;
131+ }
132+ } else {
133+ let would_exceed_max_slots =
134+ ( num_sticky_active + num_non_sticky_active + 1 ) >= max_slots;
135+ let must_wait = would_exceed_max_slots
136+ && ( num_sticky_active == 0 || num_non_sticky_active == 0 ) ;
137+ if !must_wait {
138+ break ;
139+ }
133140 }
134141 tokio:: select! {
135142 _ = sticky_active. changed( ) => ( ) ,
@@ -139,6 +146,7 @@ impl WFTPollerShared {
139146 }
140147 }
141148 }
149+
142150 pub ( crate ) fn record_sticky_backlog ( & self , v : usize ) {
143151 let _ = self . last_seen_sticky_backlog . 1 . send ( v) ;
144152 }
0 commit comments