Skip to content

Commit 14a75ad

Browse files
committed
Prevent overshooting / many idle pollers
1 parent 284a06c commit 14a75ad

11 files changed

Lines changed: 358 additions & 79 deletions

File tree

core/src/abstractions.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ use std::{
77
fmt::{Debug, Formatter},
88
sync::{
99
atomic::{AtomicBool, AtomicUsize, Ordering},
10-
Arc,
10+
Arc, OnceLock,
1111
},
12+
time::Instant,
1213
};
1314
use temporal_sdk_core_api::worker::{
1415
SlotKind, SlotMarkUsedContext, SlotReleaseContext, SlotReservationContext, SlotSupplier,
@@ -122,6 +123,7 @@ where
122123
let supp_c = self.supplier.clone();
123124
let supp_c_c = self.supplier.clone();
124125
let mets = self.metrics_ctx.clone();
126+
let mets2 = self.metrics_ctx.clone();
125127
let metric_rec =
126128
// When being called from the drop impl, the permit isn't actually dropped yet, so
127129
// account for that with the `add_one` parameter.
@@ -134,6 +136,15 @@ where
134136
mets.task_slots_used((ep_rx_c.borrow().saturating_sub(unused) + extra) as u64);
135137
};
136138
let mrc = metric_rec.clone();
139+
// TODO: Delete or flag these new metrics
140+
let use_time = Arc::new(OnceLock::<Instant>::new());
141+
let ut = use_time.clone();
142+
let use_time_record = move || {
143+
if let Some(ut) = ut.get() {
144+
mets2.task_execution_latency(ut.elapsed());
145+
}
146+
};
147+
137148
mrc(false);
138149

139150
OwnedMeteredSemPermit {
@@ -144,11 +155,13 @@ where
144155
},
145156
use_fn: Box::new(move |info| {
146157
supp_c.mark_slot_used(info);
158+
let _ = use_time.set(Instant::now());
147159
metric_rec(false)
148160
}),
149161
release_fn: Box::new(move |info| {
150162
supp_c_c.release_slot(info);
151163
ep_tx_c.send_modify(|ep| *ep -= 1);
164+
use_time_record();
152165
mrc(true)
153166
}),
154167
}

core/src/core_tests/activity_tasks.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ async fn max_activities_respected() {
9191
mock_client
9292
.expect_poll_activity_task()
9393
.times(3)
94-
.returning(move |_, _, _| Ok(tasks.pop_front().unwrap()));
94+
.returning(move |_, _| Ok(tasks.pop_front().unwrap()));
9595
mock_client
9696
.expect_complete_activity_task()
9797
.returning(|_, _| Ok(RespondActivityTaskCompletedResponse::default()));
@@ -376,7 +376,7 @@ async fn many_concurrent_heartbeat_cancels() {
376376
let mut calls_map = HashMap::<_, i32>::new();
377377
mock_client
378378
.expect_poll_activity_task()
379-
.returning(move |_, _, _| poll_resps.pop_front().unwrap());
379+
.returning(move |_, _| poll_resps.pop_front().unwrap());
380380
mock_client
381381
.expect_cancel_activity_task()
382382
.returning(move |_, _| async move { Ok(Default::default()) }.boxed());
@@ -618,8 +618,8 @@ async fn max_tq_acts_set_passed_to_poll_properly() {
618618
let mut mock_client = mock_workflow_client();
619619
mock_client
620620
.expect_poll_activity_task()
621-
.returning(move |_, tps, _| {
622-
assert_eq!(tps, Some(rate));
621+
.returning(move |_, ao| {
622+
assert_eq!(ao.max_tasks_per_sec, Some(rate));
623623
Ok(PollActivityTaskQueueResponse {
624624
task_token: vec![1],
625625
..Default::default()
@@ -1027,7 +1027,7 @@ async fn cant_complete_activity_with_unset_result_payload() {
10271027
let mut mock_client = mock_workflow_client();
10281028
mock_client
10291029
.expect_poll_activity_task()
1030-
.returning(move |_, _, _| {
1030+
.returning(move |_, _| {
10311031
Ok(PollActivityTaskQueueResponse {
10321032
task_token: vec![1],
10331033
..Default::default()

core/src/core_tests/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ async fn shutdown_interrupts_both_polls() {
5353
mock_client
5454
.expect_poll_activity_task()
5555
.times(1)
56-
.returning(move |_, _, _| {
56+
.returning(move |_, _| {
5757
async move {
5858
BARR.wait().await;
5959
sleep(Duration::from_secs(1)).await;

core/src/core_tests/workers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ async fn worker_does_not_panic_on_retry_exhaustion_of_nonfatal_net_err() {
267267
async fn worker_can_shutdown_after_never_polling_ok(#[values(true, false)] poll_workflow: bool) {
268268
let mut mock = mock_workflow_client();
269269
mock.expect_poll_activity_task()
270-
.returning(|_, _, _| Err(tonic::Status::permission_denied("you shall not pass")));
270+
.returning(|_, _| Err(tonic::Status::permission_denied("you shall not pass")));
271271
if poll_workflow {
272272
mock.expect_poll_workflow_task()
273273
.returning(|_, _| Err(tonic::Status::permission_denied("you shall not pass")));

core/src/pollers/poll_buffer.rs

Lines changed: 93 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::{
22
abstractions::{dbg_panic, MeteredPermitDealer, OwnedMeteredSemPermit},
33
pollers::{self, Poller},
4-
worker::client::WorkerClient,
4+
worker::client::{PollActivityOptions, PollOptions, PollWorkflowOptions, WorkerClient},
55
};
66
use futures_util::{future::BoxFuture, FutureExt, StreamExt};
77
use governor::{Quota, RateLimiter};
@@ -20,7 +20,7 @@ use temporal_sdk_core_api::worker::{
2020
ActivitySlotKind, NexusSlotKind, PollerBehavior, SlotKind, WorkflowSlotKind,
2121
};
2222
use temporal_sdk_core_protos::temporal::api::{
23-
taskqueue::v1::{PollerScalingDecision, TaskQueue},
23+
taskqueue::v1::PollerScalingDecision,
2424
workflowservice::v1::{
2525
PollActivityTaskQueueResponse, PollNexusTaskQueueResponse, PollWorkflowTaskQueueResponse,
2626
},
@@ -55,7 +55,7 @@ where
5555
SK: SlotKind + 'static,
5656
{
5757
pub(crate) fn new<FT, DelayFut>(
58-
poll_fn: impl Fn() -> FT + Send + Sync + 'static,
58+
poll_fn: impl Fn(Option<Duration>) -> FT + Send + Sync + 'static,
5959
permit_dealer: MeteredPermitDealer<SK>,
6060
poller_behavior: PollerBehavior,
6161
shutdown: CancellationToken,
@@ -119,14 +119,23 @@ where
119119
let pf = pf.clone();
120120
let tx = tx.clone();
121121
let report_handle = poll_scaler.get_report_handle();
122+
// Reduce poll timeout if we're frequently getting tasks, to avoid having many
123+
// outstanding long polls for a full minute after a burst subsides.
124+
let timeout_override =
125+
if report_handle.ingested_this_period.load(Ordering::Relaxed) > 2 {
126+
Some(Duration::from_secs(10))
127+
} else {
128+
None
129+
};
122130
let poll_task = tokio::spawn(async move {
123131
let r = tokio::select! {
124-
r = pf() => r,
132+
r = pf(timeout_override) => r,
125133
_ = shutdown.cancelled() => return,
126134
};
127135
drop(active_guard);
128-
report_handle.poll_result(&r);
129-
let _ = tx.send(r.map(|r| (r, permit)));
136+
if report_handle.poll_result(&r) {
137+
let _ = tx.send(r.map(|r| (r, permit)));
138+
}
130139
});
131140
let _ = spawned_tx.send(poll_task);
132141
}
@@ -248,6 +257,21 @@ where
248257
target: AtomicUsize::new(target),
249258
ever_saw_scaling_decision: AtomicBool::default(),
250259
behavior,
260+
ingested_this_period: Default::default(),
261+
ingested_last_period: Default::default(),
262+
scale_up_allowed: AtomicBool::new(true),
263+
});
264+
let rhc = report_handle.clone();
265+
// TODO: Track task
266+
tokio::task::spawn(async move {
267+
let mut interval = tokio::time::interval(Duration::from_millis(100));
268+
loop {
269+
interval.tick().await;
270+
let ingested = rhc.ingested_this_period.swap(0, Ordering::Relaxed);
271+
let ingested_last = rhc.ingested_last_period.swap(ingested, Ordering::Relaxed);
272+
rhc.scale_up_allowed
273+
.store(ingested_last >= ingested, Ordering::Relaxed);
274+
}
251275
});
252276
Self {
253277
report_handle,
@@ -286,15 +310,23 @@ struct PollScalerReportHandle {
286310
target: AtomicUsize,
287311
ever_saw_scaling_decision: AtomicBool,
288312
behavior: PollerBehavior,
313+
314+
ingested_this_period: AtomicUsize,
315+
ingested_last_period: AtomicUsize,
316+
scale_up_allowed: AtomicBool,
289317
}
290318

291319
impl PollScalerReportHandle {
292-
fn poll_result(&self, res: &Result<impl TaskPollerResult, tonic::Status>) {
320+
/// Returns true if the response should be passed on, false if it should be swallowed
321+
fn poll_result(&self, res: &Result<impl TaskPollerResult, tonic::Status>) -> bool {
293322
match res {
294323
Ok(res) => {
295324
if let PollerBehavior::SimpleMaximum(_) = self.behavior {
296325
// We don't do auto-scaling with the simple max
297-
return;
326+
return true;
327+
}
328+
if !res.is_empty() {
329+
self.ingested_this_period.fetch_add(1, Ordering::Relaxed);
298330
}
299331
if let Some(scaling_decision) = res.scaling_decision() {
300332
match scaling_decision.poll_request_delta_suggestion.cmp(&0) {
@@ -304,10 +336,15 @@ impl PollScalerReportHandle {
304336
.poll_request_delta_suggestion
305337
.unsigned_abs() as usize,
306338
),
307-
cmp::Ordering::Greater => self.change_target(
308-
usize::saturating_add,
309-
scaling_decision.poll_request_delta_suggestion as usize,
310-
),
339+
cmp::Ordering::Greater => {
340+
// Only allow scale up if in the last period it increased ingestion rate
341+
if self.scale_up_allowed.load(Ordering::Relaxed) {
342+
self.change_target(
343+
usize::saturating_add,
344+
scaling_decision.poll_request_delta_suggestion as usize,
345+
)
346+
}
347+
}
311348
cmp::Ordering::Equal => {}
312349
}
313350
self.ever_saw_scaling_decision
@@ -334,9 +371,11 @@ impl PollScalerReportHandle {
334371
// reclaim this poller
335372
self.change_target(usize::saturating_sub, 1);
336373
}
374+
return false;
337375
}
338376
}
339377
}
378+
true
340379
}
341380

342381
#[inline]
@@ -405,24 +444,37 @@ pub(crate) type PollWorkflowTaskBuffer =
405444
LongPollBuffer<PollWorkflowTaskQueueResponse, WorkflowSlotKind>;
406445
pub(crate) fn new_workflow_task_buffer(
407446
client: Arc<dyn WorkerClient>,
408-
task_queue: TaskQueue,
447+
task_queue: String,
448+
sticky_queue: Option<String>,
409449
poller_behavior: PollerBehavior,
410450
permit_dealer: MeteredPermitDealer<WorkflowSlotKind>,
411451
shutdown: CancellationToken,
412452
num_pollers_handler: Option<impl Fn(usize) + Send + Sync + 'static>,
413453
) -> PollWorkflowTaskBuffer {
414-
let no_poll_retry = if matches!(poller_behavior, PollerBehavior::Autoscaling { .. }) {
454+
let no_retry = if matches!(poller_behavior, PollerBehavior::Autoscaling { .. }) {
415455
Some(NoRetryOnMatching {
416456
predicate: poll_scaling_error_matcher,
417457
})
418458
} else {
419459
None
420460
};
421461
LongPollBuffer::new(
422-
move || {
462+
move |timeout_override| {
423463
let client = client.clone();
424464
let task_queue = task_queue.clone();
425-
async move { client.poll_workflow_task(task_queue, no_poll_retry).await }
465+
let sticky_queue_name = sticky_queue.clone();
466+
async move {
467+
client
468+
.poll_workflow_task(
469+
PollOptions {
470+
task_queue,
471+
no_retry,
472+
timeout_override,
473+
},
474+
PollWorkflowOptions { sticky_queue_name },
475+
)
476+
.await
477+
}
426478
},
427479
permit_dealer,
428480
poller_behavior,
@@ -449,20 +501,29 @@ pub(crate) fn new_activity_task_buffer(
449501
Quota::with_period(Duration::from_secs_f64(ps.recip()))
450502
.map(|q| Arc::new(RateLimiter::direct(q)))
451503
});
452-
let no_poll_retry = if matches!(poller_behavior, PollerBehavior::Autoscaling { .. }) {
504+
let no_retry = if matches!(poller_behavior, PollerBehavior::Autoscaling { .. }) {
453505
Some(NoRetryOnMatching {
454506
predicate: poll_scaling_error_matcher,
455507
})
456508
} else {
457509
None
458510
};
459511
LongPollBuffer::new(
460-
move || {
512+
move |timeout_override| {
461513
let client = client.clone();
462514
let task_queue = task_queue.clone();
463515
async move {
464516
client
465-
.poll_activity_task(task_queue, max_tps, no_poll_retry)
517+
.poll_activity_task(
518+
PollOptions {
519+
task_queue,
520+
no_retry,
521+
timeout_override,
522+
},
523+
PollActivityOptions {
524+
max_tasks_per_sec: max_tps,
525+
},
526+
)
466527
.await
467528
}
468529
},
@@ -488,18 +549,26 @@ pub(crate) fn new_nexus_task_buffer(
488549
shutdown: CancellationToken,
489550
num_pollers_handler: Option<impl Fn(usize) + Send + Sync + 'static>,
490551
) -> PollNexusTaskBuffer {
491-
let no_poll_retry = if matches!(poller_behavior, PollerBehavior::Autoscaling { .. }) {
552+
let no_retry = if matches!(poller_behavior, PollerBehavior::Autoscaling { .. }) {
492553
Some(NoRetryOnMatching {
493554
predicate: poll_scaling_error_matcher,
494555
})
495556
} else {
496557
None
497558
};
498559
LongPollBuffer::new(
499-
move || {
560+
move |timeout_override| {
500561
let client = client.clone();
501562
let task_queue = task_queue.clone();
502-
async move { client.poll_nexus_task(task_queue, no_poll_retry).await }
563+
async move {
564+
client
565+
.poll_nexus_task(PollOptions {
566+
task_queue,
567+
no_retry,
568+
timeout_override,
569+
})
570+
.await
571+
}
503572
},
504573
semaphore,
505574
poller_behavior,
@@ -592,7 +661,6 @@ mod tests {
592661
};
593662
use futures_util::FutureExt;
594663
use std::time::Duration;
595-
use temporal_sdk_core_protos::temporal::api::enums::v1::TaskQueueKind;
596664
use tokio::select;
597665

598666
#[tokio::test]
@@ -611,11 +679,8 @@ mod tests {
611679

612680
let pb = new_workflow_task_buffer(
613681
Arc::new(mock_client),
614-
TaskQueue {
615-
name: "sometq".to_string(),
616-
kind: TaskQueueKind::Normal as i32,
617-
normal_name: "".to_string(),
618-
},
682+
"sometq".to_string(),
683+
None,
619684
PollerBehavior::SimpleMaximum(1),
620685
fixed_size_permit_dealer(10),
621686
CancellationToken::new(),

core/src/telemetry/metrics.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ struct Instruments {
5757
num_pollers: Arc<dyn Gauge>,
5858
task_slots_available: Arc<dyn Gauge>,
5959
task_slots_used: Arc<dyn Gauge>,
60+
task_execution_latency: Arc<dyn HistogramDuration>,
6061
sticky_cache_hit: Arc<dyn Counter>,
6162
sticky_cache_miss: Arc<dyn Counter>,
6263
sticky_cache_size: Arc<dyn Gauge>,
@@ -280,6 +281,13 @@ impl MetricsContext {
280281
self.instruments.task_slots_used.record(num, &self.kvs)
281282
}
282283

284+
/// Record time it took to execute a task
285+
pub(crate) fn task_execution_latency(&self, dur: Duration) {
286+
self.instruments
287+
.task_execution_latency
288+
.record(dur, &self.kvs);
289+
}
290+
283291
/// Record current number of pollers. Context should include poller type / task queue tag.
284292
pub(crate) fn record_num_pollers(&self, num: usize) {
285293
self.instruments.num_pollers.record(num as u64, &self.kvs);
@@ -470,6 +478,11 @@ impl Instruments {
470478
description: "Current number of used slots per task type".into(),
471479
unit: "".into(),
472480
}),
481+
task_execution_latency: meter.histogram_duration(MetricParameters {
482+
name: "task_execution_latency".into(),
483+
description: "Histogram of task execution latencies".into(),
484+
unit: "".into(),
485+
}),
473486
sticky_cache_hit: meter.counter(MetricParameters {
474487
name: "sticky_cache_hit".into(),
475488
description: "Count of times the workflow cache was used for a new workflow task"

0 commit comments

Comments
 (0)