@@ -2,8 +2,8 @@ use crate::{
22 abstractions:: { ActiveCounter , MeteredPermitDealer , OwnedMeteredSemPermit , dbg_panic} ,
33 pollers:: { self , Poller } ,
44 worker:: {
5- ActivitySlotKind , NexusSlotKind , PollerBehavior , SlotKind , WFTPollerShared ,
6- WorkflowSlotKind ,
5+ ActivitySlotKind , NamespaceCapabilities , NexusSlotKind , PollerBehavior , SlotKind ,
6+ WFTPollerShared , WorkflowSlotKind ,
77 client:: { PollActivityOptions , PollOptions , PollWorkflowOptions , WorkerClient } ,
88 } ,
99} ;
@@ -77,16 +77,15 @@ impl LongPollBuffer<PollWorkflowTaskQueueResponse, WorkflowSlotKind> {
7777 num_pollers_handler : Option < impl Fn ( usize ) + Send + Sync + ' static > ,
7878 options : WorkflowTaskOptions ,
7979 last_successful_poll_time : Arc < AtomicCell < Option < SystemTime > > > ,
80- graceful_poll_shutdown : Arc < AtomicBool > ,
81- server_supports_autoscaling : Arc < AtomicBool > ,
80+ capabilities : Arc < NamespaceCapabilities > ,
8281 ) -> Self {
8382 let is_sticky = sticky_queue. is_some ( ) ;
8483 let poll_scaler = PollScaler :: new (
8584 poller_behavior,
8685 num_pollers_handler,
8786 shutdown. clone ( ) ,
8887 last_successful_poll_time,
89- server_supports_autoscaling ,
88+ capabilities . clone ( ) ,
9089 ) ;
9190 if let Some ( wftps) = options. wft_poller_shared . as_ref ( ) {
9291 if is_sticky {
@@ -142,7 +141,7 @@ impl LongPollBuffer<PollWorkflowTaskQueueResponse, WorkflowSlotKind> {
142141 poll_scaler,
143142 pre_permit_delay,
144143 post_poll_fn,
145- graceful_poll_shutdown ,
144+ capabilities ,
146145 )
147146 }
148147}
@@ -158,8 +157,7 @@ impl LongPollBuffer<PollActivityTaskQueueResponse, ActivitySlotKind> {
158157 num_pollers_handler : Option < impl Fn ( usize ) + Send + Sync + ' static > ,
159158 options : ActivityTaskOptions ,
160159 last_successful_poll_time : Arc < AtomicCell < Option < SystemTime > > > ,
161- graceful_poll_shutdown : Arc < AtomicBool > ,
162- server_supports_autoscaling : Arc < AtomicBool > ,
160+ capabilities : Arc < NamespaceCapabilities > ,
163161 ) -> Self {
164162 let pre_permit_delay = options
165163 . max_worker_acts_per_second
@@ -204,7 +202,7 @@ impl LongPollBuffer<PollActivityTaskQueueResponse, ActivitySlotKind> {
204202 num_pollers_handler,
205203 shutdown. clone ( ) ,
206204 last_successful_poll_time,
207- server_supports_autoscaling ,
205+ capabilities . clone ( ) ,
208206 ) ;
209207 Self :: new (
210208 poll_fn,
@@ -213,7 +211,7 @@ impl LongPollBuffer<PollActivityTaskQueueResponse, ActivitySlotKind> {
213211 poll_scaler,
214212 pre_permit_delay,
215213 None :: < fn ( & PollActivityTaskQueueResponse ) > ,
216- graceful_poll_shutdown ,
214+ capabilities ,
217215 )
218216 }
219217}
@@ -229,8 +227,7 @@ impl LongPollBuffer<PollNexusTaskQueueResponse, NexusSlotKind> {
229227 num_pollers_handler : Option < impl Fn ( usize ) + Send + Sync + ' static > ,
230228 last_successful_poll_time : Arc < AtomicCell < Option < SystemTime > > > ,
231229 send_heartbeat : bool ,
232- graceful_poll_shutdown : Arc < AtomicBool > ,
233- server_supports_autoscaling : Arc < AtomicBool > ,
230+ capabilities : Arc < NamespaceCapabilities > ,
234231 ) -> Self {
235232 let no_retry = if matches ! ( poller_behavior, PollerBehavior :: Autoscaling { .. } ) {
236233 Some ( NoRetryOnMatching {
@@ -264,11 +261,11 @@ impl LongPollBuffer<PollNexusTaskQueueResponse, NexusSlotKind> {
264261 num_pollers_handler,
265262 shutdown,
266263 last_successful_poll_time,
267- server_supports_autoscaling ,
264+ capabilities . clone ( ) ,
268265 ) ,
269266 None :: < fn ( ) -> BoxFuture < ' static , ( ) > > ,
270267 None :: < fn ( & PollNexusTaskQueueResponse ) > ,
271- graceful_poll_shutdown ,
268+ capabilities ,
272269 )
273270 }
274271}
@@ -294,7 +291,7 @@ where
294291 mut poll_scaler : PollScaler < F > ,
295292 pre_permit_delay : Option < impl Fn ( ) -> DelayFut + Send + Sync + ' static > ,
296293 post_poll_fn : Option < impl Fn ( & T ) + Send + Sync + ' static > ,
297- graceful_shutdown : Arc < AtomicBool > ,
294+ capabilities : Arc < NamespaceCapabilities > ,
298295 ) -> Self
299296 where
300297 FT : Future < Output = pollers:: Result < T > > + Send ,
@@ -365,9 +362,9 @@ where
365362 } else {
366363 None
367364 } ;
368- let graceful_shutdown = graceful_shutdown . clone ( ) ;
365+ let capabilities = capabilities . clone ( ) ;
369366 let poll_task = tokio:: spawn ( async move {
370- let r = if graceful_shutdown . load ( Ordering :: Relaxed ) {
367+ let r = if capabilities . graceful_poll_shutdown ( ) {
371368 pf ( timeout_override) . await
372369 } else {
373370 let poll_interruptor = shutdown. cancelled ( ) . then ( |_| async move {
@@ -488,7 +485,7 @@ where
488485 num_pollers_handler : Option < F > ,
489486 shutdown : CancellationToken ,
490487 last_successful_poll_time : Arc < AtomicCell < Option < SystemTime > > > ,
491- server_supports_autoscaling : Arc < AtomicBool > ,
488+ capabilities : Arc < NamespaceCapabilities > ,
492489 ) -> Self {
493490 let ( active_tx, active_rx) = watch:: channel ( 0 ) ;
494491 let num_pollers_handler = num_pollers_handler. map ( Arc :: new) ;
@@ -505,7 +502,7 @@ where
505502 min,
506503 target : AtomicUsize :: new ( target) ,
507504 ever_saw_scaling_decision : AtomicBool :: default ( ) ,
508- server_supports_autoscaling ,
505+ capabilities ,
509506 behavior,
510507 ingested_this_period : Default :: default ( ) ,
511508 ingested_last_period : Default :: default ( ) ,
@@ -590,7 +587,7 @@ struct PollScalerReportHandle {
590587 min : usize ,
591588 target : AtomicUsize ,
592589 ever_saw_scaling_decision : AtomicBool ,
593- server_supports_autoscaling : Arc < AtomicBool > ,
590+ capabilities : Arc < NamespaceCapabilities > ,
594591 behavior : PollerBehavior ,
595592
596593 ingested_this_period : AtomicUsize ,
@@ -699,7 +696,7 @@ impl PollScalerReportHandle {
699696 /// autoscaling, it's safe to scale down without having seen a decision.
700697 fn can_scale_down ( & self ) -> bool {
701698 self . ever_saw_scaling_decision . load ( Ordering :: Relaxed )
702- || self . server_supports_autoscaling . load ( Ordering :: Relaxed )
699+ || self . capabilities . poller_autoscaling ( )
703700 }
704701}
705702
@@ -865,8 +862,10 @@ mod tests {
865862 wft_poller_shared : Some ( Arc :: new ( WFTPollerShared :: new ( Some ( 10 ) ) ) ) ,
866863 } ,
867864 Arc :: new ( AtomicCell :: new ( None ) ) ,
868- Arc :: new ( AtomicBool :: new ( false ) ) ,
869- Arc :: new ( AtomicBool :: new ( false ) ) ,
865+ Arc :: new ( NamespaceCapabilities {
866+ graceful_poll_shutdown : AtomicBool :: new ( false ) ,
867+ poller_autoscaling : AtomicBool :: new ( false ) ,
868+ } ) ,
870869 ) ;
871870
872871 // Poll a bunch of times, "interrupting" it each time, we should only actually have polled
@@ -923,8 +922,10 @@ mod tests {
923922 wft_poller_shared : Some ( Arc :: new ( WFTPollerShared :: new ( Some ( 1 ) ) ) ) ,
924923 } ,
925924 Arc :: new ( AtomicCell :: new ( None ) ) ,
926- Arc :: new ( AtomicBool :: new ( false ) ) ,
927- Arc :: new ( AtomicBool :: new ( false ) ) ,
925+ Arc :: new ( NamespaceCapabilities {
926+ graceful_poll_shutdown : AtomicBool :: new ( false ) ,
927+ poller_autoscaling : AtomicBool :: new ( false ) ,
928+ } ) ,
928929 ) ;
929930
930931 // Should not see error, unwraps should get empty response
@@ -1001,8 +1002,10 @@ mod tests {
10011002 wft_poller_shared : Some ( Arc :: new ( WFTPollerShared :: new ( Some ( 10 ) ) ) ) ,
10021003 } ,
10031004 Arc :: new ( AtomicCell :: new ( None ) ) ,
1004- Arc :: new ( AtomicBool :: new ( false ) ) ,
1005- Arc :: new ( AtomicBool :: new ( false ) ) ,
1005+ Arc :: new ( NamespaceCapabilities {
1006+ graceful_poll_shutdown : AtomicBool :: new ( false ) ,
1007+ poller_autoscaling : AtomicBool :: new ( false ) ,
1008+ } ) ,
10061009 ) ;
10071010
10081011 let first_task = pb. poll ( ) . await . expect ( "Should get first task" ) ;
@@ -1108,8 +1111,10 @@ mod tests {
11081111 wft_poller_shared : Some ( Arc :: new ( WFTPollerShared :: new ( Some ( 10 ) ) ) ) ,
11091112 } ,
11101113 Arc :: new ( AtomicCell :: new ( None ) ) ,
1111- Arc :: new ( AtomicBool :: new ( false ) ) ,
1112- Arc :: new ( AtomicBool :: new ( false ) ) ,
1114+ Arc :: new ( NamespaceCapabilities {
1115+ graceful_poll_shutdown : AtomicBool :: new ( false ) ,
1116+ poller_autoscaling : AtomicBool :: new ( false ) ,
1117+ } ) ,
11131118 ) ) ;
11141119
11151120 // Trigger the first poll to initialize and get the scaling decision
@@ -1190,8 +1195,10 @@ mod tests {
11901195 wft_poller_shared : None ,
11911196 } ,
11921197 Arc :: new ( AtomicCell :: new ( None ) ) ,
1193- Arc :: new ( AtomicBool :: new ( graceful) ) ,
1194- Arc :: new ( AtomicBool :: new ( false ) ) ,
1198+ Arc :: new ( NamespaceCapabilities {
1199+ graceful_poll_shutdown : AtomicBool :: new ( graceful) ,
1200+ poller_autoscaling : AtomicBool :: new ( false ) ,
1201+ } ) ,
11951202 ) ;
11961203
11971204 let first = pb. poll ( ) . await . unwrap ( ) . unwrap ( ) ;
@@ -1243,7 +1250,10 @@ mod tests {
12431250 min : minimum,
12441251 target : AtomicUsize :: new ( 10 ) ,
12451252 ever_saw_scaling_decision : AtomicBool :: new ( false ) ,
1246- server_supports_autoscaling : Arc :: new ( AtomicBool :: new ( supports_autoscaling) ) ,
1253+ capabilities : Arc :: new ( NamespaceCapabilities {
1254+ graceful_poll_shutdown : AtomicBool :: new ( false ) ,
1255+ poller_autoscaling : AtomicBool :: new ( supports_autoscaling) ,
1256+ } ) ,
12471257 behavior : PollerBehavior :: Autoscaling {
12481258 minimum,
12491259 maximum : 10 ,
0 commit comments