Skip to content

Commit 8060969

Browse files
committed
Add additional error messages, share key, add ApplicationFailureInfo
1 parent 9322c79 commit 8060969

4 files changed

Lines changed: 50 additions & 11 deletions

File tree

client/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ use uuid::Uuid;
8686
static CLIENT_NAME_HEADER_KEY: &str = "client-name";
8787
static CLIENT_VERSION_HEADER_KEY: &str = "client-version";
8888
static TEMPORAL_NAMESPACE_HEADER_KEY: &str = "temporal-namespace";
89+
90+
/// Key used to communicate when a GRPC message is too large
91+
pub static MESSAGE_TOO_LARGE_KEY: &str = "message-too-large";
92+
8993
/// The server times out polls after 60 seconds. Set our timeout to be slightly beyond that.
9094
const LONG_POLL_TIMEOUT: Duration = Duration::from_secs(70);
9195
const OTHER_CALL_TIMEOUT: Duration = Duration::from_secs(30);

client/src/retry.rs

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
2-
Client, IsWorkerTaskLongPoll, NamespacedClient, NoRetryOnMatching, Result, RetryConfig,
3-
raw::IsUserLongPoll,
2+
Client, IsWorkerTaskLongPoll, MESSAGE_TOO_LARGE_KEY, NamespacedClient, NoRetryOnMatching,
3+
Result, RetryConfig, raw::IsUserLongPoll,
44
};
55
use backoff::{Clock, SystemClock, backoff::Backoff, exponential::ExponentialBackoff};
66
use futures_retry::{ErrorHandler, FutureRetry, RetryPolicy};
@@ -219,14 +219,19 @@ where
219219

220220
// Short circuit if message is too large - this is not retryable
221221
if e.code() == Code::ResourceExhausted
222-
&& (e.message()
223-
.starts_with("grpc: received message larger than max") ||
224-
e.message().starts_with("grpc: message after decompression larger than max") ||
225-
e.message().starts_with("grpc: received message after decompression larger than max")
222+
&& (e
223+
.message()
224+
.starts_with("grpc: received message larger than max")
225+
|| e.message()
226+
.starts_with("grpc: message after decompression larger than max")
227+
|| e.message()
228+
.starts_with("grpc: received message after decompression larger than max"))
226229
{
227230
// Leave a marker so we don't have duplicate detection logic in the workflow
228-
e.metadata_mut()
229-
.insert("message-too-large", tonic::metadata::MetadataValue::from(0));
231+
e.metadata_mut().insert(
232+
MESSAGE_TOO_LARGE_KEY,
233+
tonic::metadata::MetadataValue::from(0),
234+
);
230235
return RetryPolicy::ForwardError(e);
231236
}
232237

@@ -455,7 +460,28 @@ mod tests {
455460
);
456461
let result = err_handler.handle(
457462
1,
458-
Status::new(Code::ResourceExhausted, "received message larger than max"),
463+
Status::new(
464+
Code::ResourceExhausted,
465+
"grpc: received message larger than max",
466+
),
467+
);
468+
assert_matches!(result, RetryPolicy::ForwardError(_));
469+
470+
let result = err_handler.handle(
471+
1,
472+
Status::new(
473+
Code::ResourceExhausted,
474+
"grpc: message after decompression larger than max",
475+
),
476+
);
477+
assert_matches!(result, RetryPolicy::ForwardError(_));
478+
479+
let result = err_handler.handle(
480+
1,
481+
Status::new(
482+
Code::ResourceExhausted,
483+
"grpc: received message after decompression larger than max",
484+
),
459485
);
460486
assert_matches!(result, RetryPolicy::ForwardError(_));
461487
}

core/src/worker/workflow/mod.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ use std::{
5757
thread,
5858
time::{Duration, Instant},
5959
};
60+
use temporal_client::MESSAGE_TOO_LARGE_KEY;
6061
use temporal_sdk_core_api::{
6162
errors::{CompleteWfError, PollError},
6263
worker::{ActivitySlotKind, WorkerConfig, WorkflowSlotKind},
@@ -78,6 +79,7 @@ use temporal_sdk_core_protos::{
7879
command::v1::{Command as ProtoCommand, Command, command::Attributes},
7980
common::v1::{Memo, MeteringMetadata, RetryPolicy, SearchAttributes, WorkflowExecution},
8081
enums::v1::{VersioningBehavior, WorkflowTaskFailedCause},
82+
failure::v1::{ApplicationFailureInfo, failure::FailureInfo},
8183
protocol::v1::Message as ProtocolMessage,
8284
query::v1::WorkflowQuery,
8385
sdk::v1::{UserMetadata, WorkflowTaskCompletedMetadata},
@@ -388,11 +390,18 @@ impl Workflows {
388390
response.activity_tasks,
389391
);
390392
}
391-
Err(e) if e.metadata().contains_key("message-too-large") => {
393+
Err(e) if e.metadata().contains_key(MESSAGE_TOO_LARGE_KEY) => {
392394
let failure = Failure {
393395
failure: Some(
394396
temporal_sdk_core_protos::temporal::api::failure::v1::Failure {
395397
message: "GRPC Message too large".to_string(),
398+
failure_info: Some(FailureInfo::ApplicationFailureInfo(
399+
ApplicationFailureInfo {
400+
r#type: "GrpcMessageTooLarge".to_string(),
401+
non_retryable: true,
402+
..Default::default()
403+
},
404+
)),
396405
..Default::default()
397406
},
398407
),

tests/integ_tests/worker_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,14 +159,14 @@ async fn resource_based_few_pollers_guarantees_non_sticky_poll() {
159159
worker.run_until_done().await.unwrap();
160160
}
161161

162-
static OVERSIZE_GRPC_MESSAGE_RUN: AtomicBool = AtomicBool::new(false);
163162
#[tokio::test]
164163
async fn oversize_grpc_message() {
165164
let wf_name = "oversize_grpc_message";
166165
let mut starter = CoreWfStarter::new(wf_name);
167166
starter.worker_config.no_remote_activities(true);
168167
let mut core = starter.worker().await;
169168

169+
static OVERSIZE_GRPC_MESSAGE_RUN: AtomicBool = AtomicBool::new(false);
170170
core.register_wf(wf_name.to_owned(), |_ctx: WfContext| async move {
171171
if OVERSIZE_GRPC_MESSAGE_RUN.load(Relaxed) {
172172
Ok(vec![].into())

0 commit comments

Comments
 (0)