Skip to content

Commit 93471ac

Browse files
authored
Poller auto-scaling (#874)
1 parent 95db75d commit 93471ac

36 files changed

Lines changed: 1708 additions & 462 deletions

.cargo/config.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ CLI_VERSION_OVERRIDE = "v1.3.1-priority.0"
66
[alias]
77
integ-test = ["run", "--package", "temporal-sdk-core", "--example", "integ_runner", "--"]
88
lint = ["clippy", "--workspace", "--examples", "--all-features",
9-
"--test", "integ_tests", "--test", "heavy_tests", "--", "--D", "warnings"]
9+
"--test", "integ_tests", "--test", "heavy_tests", "--test", "manual_tests",
10+
"--", "--D", "warnings"]
1011
test-lint = ["clippy", "--all", "--all-features", "--examples", "--workspace",
1112
"--tests", "--", "--D", "warnings"]

.cargo/multi-worker-manual-test

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
#!/bin/sh
2+
3+
# Use this script during manual testing against a server to run multiple instances
4+
# of a test, thus acting like multiple workers are running concurrently
5+
6+
# Usage: multi-worker-heavy-test <number of workers> <integ test name>
7+
8+
# You may want to set env vars for targeting cloud first, ex (fish syntax):
9+
# set -gx TEMPORAL_SERVICE_ADDRESS "https://sj-poller-test.e2e.tmprl-test.cloud:7233"
10+
# set -gx TEMPORAL_USE_TLS 1
11+
# set -gx TEMPORAL_NAMESPACE sj-poller-test.e2e
12+
13+
cargo integ-test -c "--release" -t manual_tests --just-build
14+
15+
parallel --line-buffer --process-slot-var=PAR_JOBNUM -j $1 cargo integ-test -c "--release" -s external -t manual_tests -- --nocapture $2 ::: $(seq 1 $1)

.github/workflows/per-pr.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ concurrency:
1111
cancel-in-progress: true
1212

1313
jobs:
14-
build-and-test:
14+
build-and-lint:
1515
name: "Format, docs, and lint"
1616
timeout-minutes: 10
1717
runs-on: ubuntu-latest

.gitignore

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,18 @@ Cargo.lock
77
/.idea/
88
*.iml
99
*~
10+
.aider*
1011

1112
# Ignore generated protobuf files
1213
src/protos/*.rs
1314
!src/protos/mod.rs
15+
16+
# Coverage
1417
/tarpaulin-report.html
1518
/machine_coverage/
1619
/bindings/
1720
/core/machine_coverage/
21+
22+
# Keep secrets here
1823
/.cloud_certs/
19-
.aider*
24+
cloud_envs.fish

client/src/lib.rs

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ use temporal_sdk_core_protos::{
7171
},
7272
};
7373
use tonic::{
74-
Code, Status,
74+
Code,
7575
body::BoxBody,
7676
client::GrpcService,
7777
codegen::InterceptedService,
@@ -83,11 +83,6 @@ use tower::ServiceBuilder;
8383
use url::Url;
8484
use uuid::Uuid;
8585

86-
/// A request extension that, when set, should make the [RetryClient] consider this call to be a
87-
/// [CallType::TaskLongPoll]
88-
#[derive(Copy, Clone, Debug)]
89-
pub struct IsWorkerTaskLongPoll;
90-
9186
static CLIENT_NAME_HEADER_KEY: &str = "client-name";
9287
static CLIENT_VERSION_HEADER_KEY: &str = "client-version";
9388
static TEMPORAL_NAMESPACE_HEADER_KEY: &str = "temporal-namespace";
@@ -259,6 +254,18 @@ impl RetryConfig {
259254
}
260255
}
261256

257+
/// A retry policy that never retires
258+
pub const fn no_retries() -> Self {
259+
Self {
260+
initial_interval: Duration::from_secs(0),
261+
randomization_factor: 0.0,
262+
multiplier: 1.0,
263+
max_interval: Duration::from_secs(0),
264+
max_elapsed_time: None,
265+
max_retries: 1,
266+
}
267+
}
268+
262269
pub(crate) fn into_exp_backoff<C>(self, clock: C) -> exponential::ExponentialBackoff<C> {
263270
exponential::ExponentialBackoff {
264271
current_interval: self.initial_interval,
@@ -279,6 +286,20 @@ impl From<RetryConfig> for ExponentialBackoff {
279286
}
280287
}
281288

289+
/// A request extension that, when set, should make the [RetryClient] consider this call to be a
290+
/// [CallType::TaskLongPoll]
291+
#[derive(Copy, Clone, Debug)]
292+
pub struct IsWorkerTaskLongPoll;
293+
294+
/// A request extension that, when set, and a call is being processed by a [RetryClient], allows the
295+
/// caller to request certain matching errors to short-circuit-return immediately and not follow
296+
/// normal retry logic.
297+
#[derive(Copy, Clone, Debug)]
298+
pub struct NoRetryOnMatching {
299+
/// Return true if the passed-in gRPC error should be immediately returned to the caller
300+
pub predicate: fn(&tonic::Status) -> bool,
301+
}
302+
282303
impl Debug for ClientTlsConfig {
283304
// Intentionally omit details here since they could leak a key if ever printed
284305
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
@@ -514,7 +535,10 @@ pub struct ServiceCallInterceptor {
514535
impl Interceptor for ServiceCallInterceptor {
515536
/// This function will get called on each outbound request. Returning a `Status` here will
516537
/// cancel the request and have that status returned to the caller.
517-
fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> {
538+
fn call(
539+
&mut self,
540+
mut request: tonic::Request<()>,
541+
) -> Result<tonic::Request<()>, tonic::Status> {
518542
let metadata = request.metadata_mut();
519543
if !metadata.contains_key(CLIENT_NAME_HEADER_KEY) {
520544
metadata.insert(

client/src/raw.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1470,7 +1470,7 @@ mod tests {
14701470
.filter(|l| l.starts_with("rpc"))
14711471
.map(|l| {
14721472
let stripped = l.strip_prefix("rpc ").unwrap();
1473-
(stripped[..stripped.find('(').unwrap()]).trim()
1473+
stripped[..stripped.find('(').unwrap()].trim()
14741474
})
14751475
.collect();
14761476
let no_underscores: HashSet<_> = impl_list.iter().map(|x| x.replace('_', "")).collect();

client/src/retry.rs

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::{
2-
Client, IsWorkerTaskLongPoll, NamespacedClient, Result, RetryConfig, raw::IsUserLongPoll,
2+
Client, IsWorkerTaskLongPoll, NamespacedClient, NoRetryOnMatching, Result, RetryConfig,
3+
raw::IsUserLongPoll,
34
};
45
use backoff::{Clock, SystemClock, backoff::Backoff, exponential::ExponentialBackoff};
56
use futures_retry::{ErrorHandler, FutureRetry, RetryPolicy};
@@ -58,13 +59,16 @@ impl<SG> RetryClient<SG> {
5859
request: Option<&Request<R>>,
5960
) -> CallInfo {
6061
let mut call_type = CallType::Normal;
62+
let mut retry_short_circuit = None;
6163
if let Some(r) = request.as_ref() {
6264
let ext = r.extensions();
6365
if ext.get::<IsUserLongPoll>().is_some() {
6466
call_type = CallType::UserLongPoll;
6567
} else if ext.get::<IsWorkerTaskLongPoll>().is_some() {
6668
call_type = CallType::TaskLongPoll;
6769
}
70+
71+
retry_short_circuit = ext.get::<NoRetryOnMatching>().cloned();
6872
}
6973
let retry_cfg = if call_type == CallType::TaskLongPoll {
7074
RetryConfig::task_poll_retry_policy()
@@ -75,6 +79,7 @@ impl<SG> RetryClient<SG> {
7579
call_type,
7680
call_name,
7781
retry_cfg,
82+
retry_short_circuit,
7883
}
7984
}
8085

@@ -111,6 +116,7 @@ pub(crate) struct TonicErrorHandler<C: Clock> {
111116
call_type: CallType,
112117
call_name: &'static str,
113118
have_retried_goaway_cancel: bool,
119+
retry_short_circuit: Option<NoRetryOnMatching>,
114120
}
115121
impl TonicErrorHandler<SystemClock> {
116122
fn new(call_info: CallInfo, throttle_cfg: RetryConfig) -> Self {
@@ -139,6 +145,7 @@ where
139145
backoff: call_info.retry_cfg.into_exp_backoff(clock),
140146
throttle_backoff: throttle_cfg.into_exp_backoff(throttle_clock),
141147
have_retried_goaway_cancel: false,
148+
retry_short_circuit: call_info.retry_short_circuit,
142149
}
143150
}
144151

@@ -164,11 +171,12 @@ where
164171
}
165172
}
166173

167-
#[derive(Clone, Debug, PartialEq)]
174+
#[derive(Clone, Debug)]
168175
pub(crate) struct CallInfo {
169176
pub call_type: CallType,
170177
call_name: &'static str,
171178
retry_cfg: RetryConfig,
179+
retry_short_circuit: Option<NoRetryOnMatching>,
172180
}
173181

174182
#[doc(hidden)]
@@ -187,8 +195,6 @@ impl CallType {
187195
}
188196
}
189197

190-
// TODO: This ought to be configurable by the owner of the client. That way worker-specific concerns
191-
// don't need to exist in this crate.
192198
impl<C> ErrorHandler<tonic::Status> for TonicErrorHandler<C>
193199
where
194200
C: Clock,
@@ -201,6 +207,12 @@ where
201207
return RetryPolicy::ForwardError(e);
202208
}
203209

210+
if let Some(sc) = self.retry_short_circuit.as_ref() {
211+
if (sc.predicate)(&e) {
212+
return RetryPolicy::ForwardError(e);
213+
}
214+
}
215+
204216
// Task polls are OK with being cancelled or running into the timeout because there's
205217
// nothing to do but retry anyway
206218
let long_poll_allowed = self.call_type == CallType::TaskLongPoll
@@ -307,6 +319,7 @@ mod tests {
307319
call_type: CallType::TaskLongPoll,
308320
call_name,
309321
retry_cfg: TEST_RETRY_CONFIG,
322+
retry_short_circuit: None,
310323
},
311324
TEST_RETRY_CONFIG,
312325
FixedClock(Instant::now()),
@@ -334,6 +347,7 @@ mod tests {
334347
call_type: CallType::TaskLongPoll,
335348
call_name,
336349
retry_cfg: TEST_RETRY_CONFIG,
350+
retry_short_circuit: None,
337351
},
338352
TEST_RETRY_CONFIG,
339353
FixedClock(Instant::now()),
@@ -359,6 +373,7 @@ mod tests {
359373
call_type: CallType::TaskLongPoll,
360374
call_name: POLL_WORKFLOW_METH_NAME,
361375
retry_cfg: TEST_RETRY_CONFIG,
376+
retry_short_circuit: None,
362377
},
363378
RetryConfig {
364379
initial_interval: Duration::from_millis(2),
@@ -389,6 +404,25 @@ mod tests {
389404
}
390405
}
391406

407+
#[tokio::test]
408+
async fn retry_short_circuit() {
409+
let mut err_handler = TonicErrorHandler::new_with_clock(
410+
CallInfo {
411+
call_type: CallType::TaskLongPoll,
412+
call_name: POLL_WORKFLOW_METH_NAME,
413+
retry_cfg: TEST_RETRY_CONFIG,
414+
retry_short_circuit: Some(NoRetryOnMatching {
415+
predicate: |s: &Status| s.code() == Code::ResourceExhausted,
416+
}),
417+
},
418+
TEST_RETRY_CONFIG,
419+
FixedClock(Instant::now()),
420+
FixedClock(Instant::now()),
421+
);
422+
let result = err_handler.handle(1, Status::new(Code::ResourceExhausted, "leave me alone"));
423+
assert_matches!(result, RetryPolicy::ForwardError(_))
424+
}
425+
392426
#[rstest::rstest]
393427
#[tokio::test]
394428
async fn task_poll_retries_forever<R>(

0 commit comments

Comments
 (0)