Skip to content

Commit 7b4e70c

Browse files
committed
Todo cleanup
1 parent 6f9801d commit 7b4e70c

4 files changed

Lines changed: 40 additions & 25 deletions

File tree

client/src/retry.rs

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::{
2-
raw::IsUserLongPoll, Client, IsWorkerTaskLongPoll, NamespacedClient, Result, RetryConfig,
2+
raw::IsUserLongPoll, Client, IsWorkerTaskLongPoll, NamespacedClient, NoRetryOnMatching, Result,
3+
RetryConfig,
34
};
45
use backoff::{backoff::Backoff, exponential::ExponentialBackoff, Clock, SystemClock};
56
use futures_retry::{ErrorHandler, FutureRetry, RetryPolicy};
@@ -58,24 +59,27 @@ impl<SG> RetryClient<SG> {
5859
request: Option<&Request<R>>,
5960
) -> CallInfo {
6061
let mut call_type = CallType::Normal;
62+
let mut retry_short_circuit = None;
6163
if let Some(r) = request.as_ref() {
6264
let ext = r.extensions();
6365
if ext.get::<IsUserLongPoll>().is_some() {
6466
call_type = CallType::UserLongPoll;
6567
} else if ext.get::<IsWorkerTaskLongPoll>().is_some() {
6668
call_type = CallType::TaskLongPoll;
6769
}
70+
71+
retry_short_circuit = ext.get::<NoRetryOnMatching>().cloned();
6872
}
6973
let retry_cfg = if call_type == CallType::TaskLongPoll {
7074
RetryConfig::task_poll_retry_policy()
7175
} else {
7276
(*self.retry_config).clone()
7377
};
74-
// TODO: Set retry short-circuits
7578
CallInfo {
7679
call_type,
7780
call_name,
7881
retry_cfg,
82+
retry_short_circuit,
7983
}
8084
}
8185

@@ -112,6 +116,7 @@ pub(crate) struct TonicErrorHandler<C: Clock> {
112116
call_type: CallType,
113117
call_name: &'static str,
114118
have_retried_goaway_cancel: bool,
119+
retry_short_circuit: Option<NoRetryOnMatching>,
115120
}
116121
impl TonicErrorHandler<SystemClock> {
117122
fn new(call_info: CallInfo, throttle_cfg: RetryConfig) -> Self {
@@ -140,6 +145,7 @@ where
140145
backoff: call_info.retry_cfg.into_exp_backoff(clock),
141146
throttle_backoff: throttle_cfg.into_exp_backoff(throttle_clock),
142147
have_retried_goaway_cancel: false,
148+
retry_short_circuit: call_info.retry_short_circuit,
143149
}
144150
}
145151

@@ -165,11 +171,12 @@ where
165171
}
166172
}
167173

168-
#[derive(Clone, Debug, PartialEq)]
174+
#[derive(Clone, Debug)]
169175
pub(crate) struct CallInfo {
170176
pub call_type: CallType,
171177
call_name: &'static str,
172178
retry_cfg: RetryConfig,
179+
retry_short_circuit: Option<NoRetryOnMatching>,
173180
}
174181

175182
#[doc(hidden)]
@@ -200,6 +207,12 @@ where
200207
return RetryPolicy::ForwardError(e);
201208
}
202209

210+
if let Some(sc) = self.retry_short_circuit.as_ref() {
211+
if (sc.predicate)(&e) {
212+
return RetryPolicy::ForwardError(e);
213+
}
214+
}
215+
203216
// Task polls are OK with being cancelled or running into the timeout because there's
204217
// nothing to do but retry anyway
205218
let long_poll_allowed = self.call_type == CallType::TaskLongPoll
@@ -306,6 +319,7 @@ mod tests {
306319
call_type: CallType::TaskLongPoll,
307320
call_name,
308321
retry_cfg: TEST_RETRY_CONFIG,
322+
retry_short_circuit: None,
309323
},
310324
TEST_RETRY_CONFIG,
311325
FixedClock(Instant::now()),
@@ -333,6 +347,7 @@ mod tests {
333347
call_type: CallType::TaskLongPoll,
334348
call_name,
335349
retry_cfg: TEST_RETRY_CONFIG,
350+
retry_short_circuit: None,
336351
},
337352
TEST_RETRY_CONFIG,
338353
FixedClock(Instant::now()),
@@ -358,6 +373,7 @@ mod tests {
358373
call_type: CallType::TaskLongPoll,
359374
call_name: POLL_WORKFLOW_METH_NAME,
360375
retry_cfg: TEST_RETRY_CONFIG,
376+
retry_short_circuit: None,
361377
},
362378
RetryConfig {
363379
initial_interval: Duration::from_millis(2),
@@ -388,6 +404,25 @@ mod tests {
388404
}
389405
}
390406

407+
#[tokio::test]
408+
async fn retry_short_circuit() {
409+
let mut err_handler = TonicErrorHandler::new_with_clock(
410+
CallInfo {
411+
call_type: CallType::TaskLongPoll,
412+
call_name: POLL_WORKFLOW_METH_NAME,
413+
retry_cfg: TEST_RETRY_CONFIG,
414+
retry_short_circuit: Some(NoRetryOnMatching {
415+
predicate: |s: &Status| s.code() == Code::ResourceExhausted,
416+
}),
417+
},
418+
TEST_RETRY_CONFIG,
419+
FixedClock(Instant::now()),
420+
FixedClock(Instant::now()),
421+
);
422+
let result = err_handler.handle(1, Status::new(Code::ResourceExhausted, "leave me alone"));
423+
assert_matches!(result, RetryPolicy::ForwardError(_))
424+
}
425+
391426
#[rstest::rstest]
392427
#[tokio::test]
393428
async fn task_poll_retries_forever<R>(

core-api/src/worker.rs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -218,22 +218,6 @@ impl WorkerConfigBuilder {
218218
return Err("max_outstanding_* fields are mutually exclusive with `tuner`".to_owned());
219219
}
220220

221-
// TODO: Does this really matter with how slot suppliers work now?
222-
// let max_wft_polls = self
223-
// .max_concurrent_wft_polls
224-
// .unwrap_or(MAX_CONCURRENT_WFT_POLLS_DEFAULT);
225-
//
226-
// // It wouldn't make any sense to have more outstanding polls than workflows we can possibly
227-
// // cache. If we allow this at low values it's possible for sticky pollers to reserve all
228-
// // available slots, crowding out the normal queue and gumming things up.
229-
// if let Some(max_cache) = self.max_cached_workflows {
230-
// if max_cache > 0 && max_wft_polls > max_cache {
231-
// return Err(
232-
// "`max_concurrent_wft_polls` cannot exceed `max_cached_workflows`".to_owned(),
233-
// );
234-
// }
235-
// }
236-
237221
if self.use_worker_versioning.unwrap_or_default()
238222
&& self
239223
.worker_build_id

core/src/pollers/poll_buffer.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,6 @@ impl PollScalerReportHandle {
298298
return;
299299
}
300300
if let Some(scaling_decision) = res.scaling_decision() {
301-
warn!("Got sd {:?}", scaling_decision);
302301
match scaling_decision.poller_delta.cmp(&0) {
303302
cmp::Ordering::Less => self.change_target(
304303
usize::saturating_sub,
@@ -316,18 +315,16 @@ impl PollScalerReportHandle {
316315
// We want to avoid scaling down on empty polls if the server has never made any scaling
317316
// decisions - otherwise we might never scale up again.
318317
else if self.ever_saw_scaling_decision.load(Ordering::Relaxed) && res.is_empty() {
319-
warn!("Removing poller - empty response");
320318
self.change_target(usize::saturating_sub, 1);
321319
}
322320
}
323321
Err(e) => {
324322
// We should only see (and react to) errors in autoscaling mode
325323
if matches!(self.behavior, PollerBehavior::Autoscaling { .. }) {
326-
// TODO: Make debug before merge
327-
warn!("Got error from server: {:?}", e);
324+
debug!("Got error from server while polling: {:?}", e);
328325
// TODO (REVIEW): A concern here is we can bounce off of ratelimiter
329326
// because server keeps telling us "scale up!" and then we hit ratelimit
330-
// and halve again.
327+
// and halve again. Not necessarily a huge issue, but open to ideas to fix.
331328
if e.code() == Code::ResourceExhausted {
332329
// Scale down significantly for resource exhaustion
333330
self.change_target(usize::saturating_div, 2);

tests/manual_tests.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@ async fn poller_load_spiky() {
118118
})
119119
.await;
120120
info!("Initial load ran for {:?}", start_processing.elapsed());
121-
// TODO: Maybe send signals for round two
122121
ah.abort();
123122
// Wait a minute for poller count to drop
124123
tokio::time::sleep(Duration::from_secs(60)).await;

0 commit comments

Comments
 (0)