Skip to content

Commit 984da13

Browse files
authored
Set WFT Failed cause on grpc message too large incidents (#1229)
1 parent 703fe6b commit 984da13

3 files changed

Lines changed: 24 additions & 32 deletions

File tree

crates/sdk-core/src/core_tests/queries.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -695,6 +695,7 @@ async fn legacy_query_too_large_responds_with_failure() {
695695
assert!(
696696
matches!(&result, LegacyQueryResult::Failed(f) if
697697
f.failure.as_ref().unwrap().message == "GRPC Message too large"
698+
&& f.force_cause() == WorkflowTaskFailedCause::GrpcMessageTooLarge
698699
),
699700
"Expected query failure with message-too-large, got: {:?}",
700701
matches!(&result, LegacyQueryResult::Failed(_)),

crates/sdk-core/src/core_tests/workflow_tasks.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3109,6 +3109,8 @@ async fn grpc_message_too_large_doesnt_spam_task_fails() {
31093109
Ok(Default::default())
31103110
}
31113111
}));
3112+
mh.expect_fail_wft_matcher =
3113+
Box::new(|_, cause, _| *cause == WorkflowTaskFailedCause::GrpcMessageTooLarge);
31123114

31133115
let mut mock = build_mock_pollers(mh);
31143116
mock.worker_cfg(|wc| wc.max_cached_workflows = 1);

crates/sdk-core/src/worker/workflow/mod.rs

Lines changed: 21 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -416,22 +416,7 @@ impl Workflows {
416416
Err(e)
417417
if e.metadata().contains_key(MESSAGE_TOO_LARGE_KEY) && attempt < 2 =>
418418
{
419-
let failure = Failure {
420-
failure: Some(
421-
temporalio_common::protos::temporal::api::failure::v1::Failure {
422-
message: "GRPC Message too large".to_string(),
423-
failure_info: Some(FailureInfo::ApplicationFailureInfo(
424-
ApplicationFailureInfo {
425-
r#type: "GrpcMessageTooLarge".to_string(),
426-
non_retryable: true,
427-
..Default::default()
428-
},
429-
)),
430-
..Default::default()
431-
},
432-
),
433-
force_cause: 0,
434-
};
419+
let failure = make_grpc_message_too_large_failure();
435420
let new_outcome = FailedActivationWFTReport::Report(
436421
task_token,
437422
WorkflowTaskFailedCause::GrpcMessageTooLarge,
@@ -860,22 +845,7 @@ impl Workflows {
860845
}
861846
Err(e) if e.metadata().contains_key(MESSAGE_TOO_LARGE_KEY) => {
862847
warn!(error=%e, "Query response too large, responding with failure");
863-
let failure = Failure {
864-
failure: Some(
865-
temporalio_common::protos::temporal::api::failure::v1::Failure {
866-
message: "GRPC Message too large".to_string(),
867-
failure_info: Some(FailureInfo::ApplicationFailureInfo(
868-
ApplicationFailureInfo {
869-
r#type: "GrpcMessageTooLarge".to_string(),
870-
non_retryable: true,
871-
..Default::default()
872-
},
873-
)),
874-
..Default::default()
875-
},
876-
),
877-
force_cause: 0,
878-
};
848+
let failure = make_grpc_message_too_large_failure();
879849
if let Err(e2) = self
880850
.client
881851
.respond_legacy_query(tt, LegacyQueryResult::Failed(failure))
@@ -1739,6 +1709,25 @@ fn prepare_to_ship_activation(wfa: &mut WorkflowActivation) {
17391709
});
17401710
}
17411711

1712+
fn make_grpc_message_too_large_failure() -> Failure {
1713+
Failure {
1714+
failure: Some(
1715+
temporalio_common::protos::temporal::api::failure::v1::Failure {
1716+
message: "GRPC Message too large".to_string(),
1717+
failure_info: Some(FailureInfo::ApplicationFailureInfo(
1718+
ApplicationFailureInfo {
1719+
r#type: "GrpcMessageTooLarge".to_string(),
1720+
non_retryable: true,
1721+
..Default::default()
1722+
},
1723+
)),
1724+
..Default::default()
1725+
},
1726+
),
1727+
force_cause: WorkflowTaskFailedCause::GrpcMessageTooLarge as i32,
1728+
}
1729+
}
1730+
17421731
#[cfg(test)]
17431732
mod tests {
17441733
use super::*;

0 commit comments

Comments
 (0)