Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .cargo/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ CLI_VERSION_OVERRIDE = "v1.3.1-priority.0"
[alias]
integ-test = ["run", "--package", "temporal-sdk-core", "--example", "integ_runner", "--"]
lint = ["clippy", "--workspace", "--examples", "--all-features",
"--test", "integ_tests", "--test", "heavy_tests", "--", "--D", "warnings"]
"--test", "integ_tests", "--test", "heavy_tests", "--test", "manual_tests",
"--", "--D", "warnings"]
test-lint = ["clippy", "--all", "--all-features", "--examples", "--workspace",
"--tests", "--", "--D", "warnings"]
15 changes: 15 additions & 0 deletions .cargo/multi-worker-manual-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/bin/sh

# Use this script during manual testing against a server to run multiple instances
# of a test, thus acting like multiple workers are running concurrently

# Usage: multi-worker-heavy-test <number of workers> <integ test name>

# You may want to set env vars for targeting cloud first, ex (fish syntax):
# set -gx TEMPORAL_SERVICE_ADDRESS "https://sj-poller-test.e2e.tmprl-test.cloud:7233"
# set -gx TEMPORAL_USE_TLS 1
# set -gx TEMPORAL_NAMESPACE sj-poller-test.e2e

cargo integ-test -c "--release" -t manual_tests --just-build

parallel --line-buffer --process-slot-var=PAR_JOBNUM -j $1 cargo integ-test -c "--release" -s external -t manual_tests -- --nocapture $2 ::: $(seq 1 $1)
2 changes: 1 addition & 1 deletion .github/workflows/per-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ concurrency:
cancel-in-progress: true

jobs:
build-and-test:
build-and-lint:
name: "Format, docs, and lint"
timeout-minutes: 10
runs-on: ubuntu-latest
Expand Down
7 changes: 6 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,18 @@ Cargo.lock
/.idea/
*.iml
*~
.aider*

# Ignore generated protobuf files
src/protos/*.rs
!src/protos/mod.rs

# Coverage
/tarpaulin-report.html
/machine_coverage/
/bindings/
/core/machine_coverage/

# Keep secrets here
/.cloud_certs/
.aider*
cloud_envs.fish
38 changes: 31 additions & 7 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ use temporal_sdk_core_protos::{
},
};
use tonic::{
Code, Status,
Code,
body::BoxBody,
client::GrpcService,
codegen::InterceptedService,
Expand All @@ -83,11 +83,6 @@ use tower::ServiceBuilder;
use url::Url;
use uuid::Uuid;

/// A request extension that, when set, should make the [RetryClient] consider this call to be a
/// [CallType::TaskLongPoll]
#[derive(Copy, Clone, Debug)]
pub struct IsWorkerTaskLongPoll;

static CLIENT_NAME_HEADER_KEY: &str = "client-name";
static CLIENT_VERSION_HEADER_KEY: &str = "client-version";
static TEMPORAL_NAMESPACE_HEADER_KEY: &str = "temporal-namespace";
Expand Down Expand Up @@ -259,6 +254,18 @@ impl RetryConfig {
}
}

/// A retry policy that never retires
pub const fn no_retries() -> Self {
Self {
initial_interval: Duration::from_secs(0),
randomization_factor: 0.0,
multiplier: 1.0,
max_interval: Duration::from_secs(0),
max_elapsed_time: None,
max_retries: 1,
}
}

pub(crate) fn into_exp_backoff<C>(self, clock: C) -> exponential::ExponentialBackoff<C> {
exponential::ExponentialBackoff {
current_interval: self.initial_interval,
Expand All @@ -279,6 +286,20 @@ impl From<RetryConfig> for ExponentialBackoff {
}
}

/// A request extension that, when set, should make the [RetryClient] consider this call to be a
/// [CallType::TaskLongPoll]
#[derive(Copy, Clone, Debug)]
pub struct IsWorkerTaskLongPoll;

/// A request extension that, when set, and a call is being processed by a [RetryClient], allows the
/// caller to request certain matching errors to short-circuit-return immediately and not follow
/// normal retry logic.
#[derive(Copy, Clone, Debug)]
pub struct NoRetryOnMatching {
/// Return true if the passed-in gRPC error should be immediately returned to the caller
pub predicate: fn(&tonic::Status) -> bool,
}

impl Debug for ClientTlsConfig {
// Intentionally omit details here since they could leak a key if ever printed
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Expand Down Expand Up @@ -514,7 +535,10 @@ pub struct ServiceCallInterceptor {
impl Interceptor for ServiceCallInterceptor {
/// This function will get called on each outbound request. Returning a `Status` here will
/// cancel the request and have that status returned to the caller.
fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> {
fn call(
&mut self,
mut request: tonic::Request<()>,
) -> Result<tonic::Request<()>, tonic::Status> {
let metadata = request.metadata_mut();
if !metadata.contains_key(CLIENT_NAME_HEADER_KEY) {
metadata.insert(
Expand Down
2 changes: 1 addition & 1 deletion client/src/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1470,7 +1470,7 @@ mod tests {
.filter(|l| l.starts_with("rpc"))
.map(|l| {
let stripped = l.strip_prefix("rpc ").unwrap();
(stripped[..stripped.find('(').unwrap()]).trim()
stripped[..stripped.find('(').unwrap()].trim()
})
.collect();
let no_underscores: HashSet<_> = impl_list.iter().map(|x| x.replace('_', "")).collect();
Expand Down
42 changes: 38 additions & 4 deletions client/src/retry.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
Client, IsWorkerTaskLongPoll, NamespacedClient, Result, RetryConfig, raw::IsUserLongPoll,
Client, IsWorkerTaskLongPoll, NamespacedClient, NoRetryOnMatching, Result, RetryConfig,
raw::IsUserLongPoll,
};
use backoff::{Clock, SystemClock, backoff::Backoff, exponential::ExponentialBackoff};
use futures_retry::{ErrorHandler, FutureRetry, RetryPolicy};
Expand Down Expand Up @@ -58,13 +59,16 @@ impl<SG> RetryClient<SG> {
request: Option<&Request<R>>,
) -> CallInfo {
let mut call_type = CallType::Normal;
let mut retry_short_circuit = None;
if let Some(r) = request.as_ref() {
let ext = r.extensions();
if ext.get::<IsUserLongPoll>().is_some() {
call_type = CallType::UserLongPoll;
} else if ext.get::<IsWorkerTaskLongPoll>().is_some() {
call_type = CallType::TaskLongPoll;
}

retry_short_circuit = ext.get::<NoRetryOnMatching>().cloned();
}
let retry_cfg = if call_type == CallType::TaskLongPoll {
RetryConfig::task_poll_retry_policy()
Expand All @@ -75,6 +79,7 @@ impl<SG> RetryClient<SG> {
call_type,
call_name,
retry_cfg,
retry_short_circuit,
}
}

Expand Down Expand Up @@ -111,6 +116,7 @@ pub(crate) struct TonicErrorHandler<C: Clock> {
call_type: CallType,
call_name: &'static str,
have_retried_goaway_cancel: bool,
retry_short_circuit: Option<NoRetryOnMatching>,
}
impl TonicErrorHandler<SystemClock> {
fn new(call_info: CallInfo, throttle_cfg: RetryConfig) -> Self {
Expand Down Expand Up @@ -139,6 +145,7 @@ where
backoff: call_info.retry_cfg.into_exp_backoff(clock),
throttle_backoff: throttle_cfg.into_exp_backoff(throttle_clock),
have_retried_goaway_cancel: false,
retry_short_circuit: call_info.retry_short_circuit,
}
}

Expand All @@ -164,11 +171,12 @@ where
}
}

#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug)]
pub(crate) struct CallInfo {
pub call_type: CallType,
call_name: &'static str,
retry_cfg: RetryConfig,
retry_short_circuit: Option<NoRetryOnMatching>,
}

#[doc(hidden)]
Expand All @@ -187,8 +195,6 @@ impl CallType {
}
}

// TODO: This ought to be configurable by the owner of the client. That way worker-specific concerns
// don't need to exist in this crate.
impl<C> ErrorHandler<tonic::Status> for TonicErrorHandler<C>
where
C: Clock,
Expand All @@ -201,6 +207,12 @@ where
return RetryPolicy::ForwardError(e);
}

if let Some(sc) = self.retry_short_circuit.as_ref() {
if (sc.predicate)(&e) {
return RetryPolicy::ForwardError(e);
}
}

// Task polls are OK with being cancelled or running into the timeout because there's
// nothing to do but retry anyway
let long_poll_allowed = self.call_type == CallType::TaskLongPoll
Expand Down Expand Up @@ -307,6 +319,7 @@ mod tests {
call_type: CallType::TaskLongPoll,
call_name,
retry_cfg: TEST_RETRY_CONFIG,
retry_short_circuit: None,
},
TEST_RETRY_CONFIG,
FixedClock(Instant::now()),
Expand Down Expand Up @@ -334,6 +347,7 @@ mod tests {
call_type: CallType::TaskLongPoll,
call_name,
retry_cfg: TEST_RETRY_CONFIG,
retry_short_circuit: None,
},
TEST_RETRY_CONFIG,
FixedClock(Instant::now()),
Expand All @@ -359,6 +373,7 @@ mod tests {
call_type: CallType::TaskLongPoll,
call_name: POLL_WORKFLOW_METH_NAME,
retry_cfg: TEST_RETRY_CONFIG,
retry_short_circuit: None,
},
RetryConfig {
initial_interval: Duration::from_millis(2),
Expand Down Expand Up @@ -389,6 +404,25 @@ mod tests {
}
}

#[tokio::test]
async fn retry_short_circuit() {
let mut err_handler = TonicErrorHandler::new_with_clock(
CallInfo {
call_type: CallType::TaskLongPoll,
call_name: POLL_WORKFLOW_METH_NAME,
retry_cfg: TEST_RETRY_CONFIG,
retry_short_circuit: Some(NoRetryOnMatching {
predicate: |s: &Status| s.code() == Code::ResourceExhausted,
}),
},
TEST_RETRY_CONFIG,
FixedClock(Instant::now()),
FixedClock(Instant::now()),
);
let result = err_handler.handle(1, Status::new(Code::ResourceExhausted, "leave me alone"));
assert_matches!(result, RetryPolicy::ForwardError(_))
}

#[rstest::rstest]
#[tokio::test]
async fn task_poll_retries_forever<R>(
Expand Down
Loading
Loading