Skip to content

Commit 7be4134

Browse files
committed
Fix ingestor task not finishing
1 parent 0ade3b9 commit 7be4134

1 file changed

Lines changed: 17 additions & 13 deletions

File tree

core/src/pollers/poll_buffer.rs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ impl LongPollBuffer<PollWorkflowTaskQueueResponse, WorkflowSlotKind> {
7676
options: WorkflowTaskOptions,
7777
) -> Self {
7878
let is_sticky = sticky_queue.is_some();
79-
let poll_scaler = PollScaler::new(poller_behavior, num_pollers_handler);
79+
let poll_scaler = PollScaler::new(poller_behavior, num_pollers_handler, shutdown.clone());
8080
if is_sticky {
8181
options
8282
.wft_poller_shared
@@ -183,7 +183,7 @@ impl LongPollBuffer<PollActivityTaskQueueResponse, ActivitySlotKind> {
183183
}
184184
};
185185

186-
let poll_scaler = PollScaler::new(poller_behavior, num_pollers_handler);
186+
let poll_scaler = PollScaler::new(poller_behavior, num_pollers_handler, shutdown.clone());
187187
Self::new(
188188
poll_fn,
189189
permit_dealer,
@@ -227,8 +227,8 @@ impl LongPollBuffer<PollNexusTaskQueueResponse, NexusSlotKind> {
227227
Self::new(
228228
poll_fn,
229229
permit_dealer,
230-
shutdown,
231-
PollScaler::new(poller_behavior, num_pollers_handler),
230+
shutdown.clone(),
231+
PollScaler::new(poller_behavior, num_pollers_handler, shutdown),
232232
None::<fn() -> BoxFuture<'static, ()>>,
233233
None::<fn(&PollNexusTaskQueueResponse)>,
234234
)
@@ -432,7 +432,11 @@ impl<F> PollScaler<F>
432432
where
433433
F: Fn(usize),
434434
{
435-
fn new(behavior: PollerBehavior, num_pollers_handler: Option<F>) -> Self {
435+
fn new(
436+
behavior: PollerBehavior,
437+
num_pollers_handler: Option<F>,
438+
shutdown: CancellationToken,
439+
) -> Self {
436440
let (active_tx, active_rx) = watch::channel(0);
437441
let num_pollers_handler = num_pollers_handler.map(Arc::new);
438442
let (min, max, target) = match behavior {
@@ -454,11 +458,14 @@ where
454458
scale_up_allowed: AtomicBool::new(true),
455459
});
456460
let rhc = report_handle.clone();
457-
let join_handle = if behavior.is_autoscaling() {
461+
let ingestor_task = if behavior.is_autoscaling() {
458462
Some(tokio::task::spawn(async move {
459463
let mut interval = tokio::time::interval(Duration::from_millis(100));
460464
loop {
461-
interval.tick().await;
465+
tokio::select! {
466+
_ = interval.tick() => {}
467+
_ = shutdown.cancelled() => { break; }
468+
}
462469
let ingested = rhc.ingested_this_period.swap(0, Ordering::Relaxed);
463470
let ingested_last = rhc.ingested_last_period.swap(ingested, Ordering::Relaxed);
464471
rhc.scale_up_allowed
@@ -473,7 +480,7 @@ where
473480
active_tx,
474481
active_rx,
475482
num_pollers_handler,
476-
ingestor_task: join_handle,
483+
ingestor_task,
477484
}
478485
}
479486

@@ -632,13 +639,10 @@ impl
632639

633640
/// Returns true for errors that the poller scaler wants to see
634641
fn poll_scaling_error_matcher(err: &tonic::Status) -> bool {
635-
if matches!(
642+
matches!(
636643
err.code(),
637644
Code::ResourceExhausted | Code::Cancelled | Code::DeadlineExceeded
638-
) {
639-
return true;
640-
}
641-
false
645+
)
642646
}
643647

644648
pub(crate) trait TaskPollerResult {

0 commit comments

Comments
 (0)