Skip to content

Commit c2c7b10

Browse files
committed
Don't retry on grpc message too large errors
1 parent 8e5ae54 commit c2c7b10

3 files changed

Lines changed: 253 additions & 108 deletions

File tree

client/src/retry.rs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,11 @@ where
201201
{
202202
type OutError = tonic::Status;
203203

204-
fn handle(&mut self, current_attempt: usize, e: tonic::Status) -> RetryPolicy<tonic::Status> {
204+
fn handle(
205+
&mut self,
206+
current_attempt: usize,
207+
mut e: tonic::Status,
208+
) -> RetryPolicy<tonic::Status> {
205209
// 0 max retries means unlimited retries
206210
if self.max_retries > 0 && current_attempt >= self.max_retries {
207211
return RetryPolicy::ForwardError(e);
@@ -213,6 +217,17 @@ where
213217
}
214218
}
215219

220+
// Short circuit if message is too large - this is not retryable
221+
if e.code() == Code::ResourceExhausted
222+
&& e.message()
223+
.contains("grpc: received message larger than max")
224+
{
225+
// Leave a marker so we don't have duplicate detection logic in the workflow
226+
e.metadata_mut()
227+
.insert("message-too-large", tonic::metadata::MetadataValue::from(0));
228+
return RetryPolicy::ForwardError(e);
229+
}
230+
216231
// Task polls are OK with being cancelled or running into the timeout because there's
217232
// nothing to do but retry anyway
218233
let long_poll_allowed = self.call_type == CallType::TaskLongPoll
@@ -423,6 +438,26 @@ mod tests {
423438
assert_matches!(result, RetryPolicy::ForwardError(_))
424439
}
425440

441+
#[tokio::test]
442+
async fn message_too_large_not_retried() {
443+
let mut err_handler = TonicErrorHandler::new_with_clock(
444+
CallInfo {
445+
call_type: CallType::TaskLongPoll,
446+
call_name: POLL_WORKFLOW_METH_NAME,
447+
retry_cfg: TEST_RETRY_CONFIG,
448+
retry_short_circuit: None,
449+
},
450+
TEST_RETRY_CONFIG,
451+
FixedClock(Instant::now()),
452+
FixedClock(Instant::now()),
453+
);
454+
let result = err_handler.handle(
455+
1,
456+
Status::new(Code::ResourceExhausted, "received message larger than max"),
457+
);
458+
assert_matches!(result, RetryPolicy::ForwardError(_));
459+
}
460+
426461
#[rstest::rstest]
427462
#[tokio::test]
428463
async fn task_poll_retries_forever<R>(

core/src/worker/workflow/mod.rs

Lines changed: 180 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ mod workflow_stream;
1414
pub(crate) use driven_workflow::DrivenWorkflow;
1515
pub(crate) use history_update::HistoryUpdate;
1616

17+
use crate::protosext::ValidPollWFTQResponse;
1718
use crate::{
1819
MetricsContext,
1920
abstractions::{
@@ -317,6 +318,180 @@ impl Workflows {
317318
}
318319
}
319320

321+
async fn handle_activation_completed_success(
322+
&self,
323+
run_id: &str,
324+
wft_from_complete: &mut Option<ValidPollWFTQResponse>,
325+
completion_time: Instant,
326+
report: ServerCommandsWithWorkflowInfo,
327+
) -> WFTReportStatus {
328+
match report {
329+
ServerCommandsWithWorkflowInfo {
330+
task_token,
331+
action:
332+
ActivationAction::WftComplete {
333+
mut commands,
334+
messages,
335+
query_responses,
336+
force_new_wft,
337+
sdk_metadata,
338+
mut versioning_behavior,
339+
},
340+
} => {
341+
let reserved_act_permits =
342+
self.reserve_activity_slots_for_outgoing_commands(commands.as_mut_slice());
343+
debug!(commands=%commands.display(), query_responses=%query_responses.display(),
344+
messages=%messages.display(), force_new_wft,
345+
"Sending responses to server");
346+
if let Some(default_vb) = self.default_versioning_behavior.as_ref() {
347+
if versioning_behavior == VersioningBehavior::Unspecified {
348+
versioning_behavior = *default_vb;
349+
}
350+
}
351+
let mut completion = WorkflowTaskCompletion {
352+
task_token: task_token.clone(),
353+
commands,
354+
messages,
355+
query_responses,
356+
sticky_attributes: None,
357+
return_new_workflow_task: true,
358+
force_create_new_workflow_task: force_new_wft,
359+
sdk_metadata,
360+
metering_metadata: MeteringMetadata {
361+
nonfirst_local_activity_execution_attempts: self
362+
.local_act_mgr
363+
.get_nonfirst_attempt_count(run_id)
364+
as u32,
365+
},
366+
versioning_behavior,
367+
};
368+
let sticky_attrs = self.sticky_attrs.clone();
369+
// Do not return new WFT if we would not cache, because returned new WFTs are
370+
// always partial.
371+
if sticky_attrs.is_none() {
372+
completion.return_new_workflow_task = false;
373+
}
374+
completion.sticky_attributes = sticky_attrs;
375+
376+
let mut reset_last_started_to = None;
377+
self.handle_wft_reporting_errs(run_id, || async {
378+
match self.client.complete_workflow_task(completion).await {
379+
Ok(response) => {
380+
if response.reset_history_event_id > 0 {
381+
reset_last_started_to = Some(response.reset_history_event_id);
382+
}
383+
if let Some(wft) = response.workflow_task {
384+
*wft_from_complete = Some(validate_wft(wft)?);
385+
}
386+
self.handle_eager_activities(
387+
reserved_act_permits,
388+
response.activity_tasks,
389+
);
390+
}
391+
Err(e) if e.metadata().contains_key("message-too-large") => {
392+
let failure = Failure {
393+
failure: Some(
394+
temporal_sdk_core_protos::temporal::api::failure::v1::Failure {
395+
message: "GRPC Message too large".to_string(),
396+
..Default::default()
397+
},
398+
),
399+
force_cause: 0,
400+
};
401+
// TODO: tim - Update workflow cause when API is ready
402+
let new_outcome = FailedActivationWFTReport::Report(
403+
task_token,
404+
WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure,
405+
failure,
406+
);
407+
self.handle_activation_failed(run_id, completion_time, new_outcome)
408+
.await;
409+
}
410+
e => {
411+
e?;
412+
}
413+
};
414+
415+
Ok(())
416+
})
417+
.await;
418+
WFTReportStatus::Reported {
419+
reset_last_started_to,
420+
completion_time,
421+
}
422+
}
423+
ServerCommandsWithWorkflowInfo {
424+
task_token,
425+
action: ActivationAction::RespondLegacyQuery { result },
426+
} => {
427+
self.respond_legacy_query(task_token, *result).await;
428+
WFTReportStatus::Reported {
429+
reset_last_started_to: None,
430+
completion_time,
431+
}
432+
}
433+
}
434+
}
435+
436+
async fn handle_activation_failed(
437+
&self,
438+
run_id: &str,
439+
completion_time: Instant,
440+
outcome: FailedActivationWFTReport,
441+
) -> WFTReportStatus {
442+
match outcome {
443+
FailedActivationWFTReport::Report(tt, cause, failure) => {
444+
warn!(run_id=%run_id, failure=?failure, "Failing workflow task");
445+
self.handle_wft_reporting_errs(run_id, || async {
446+
self.client
447+
.fail_workflow_task(tt, cause, failure.failure)
448+
.await
449+
})
450+
.await;
451+
WFTReportStatus::Reported {
452+
reset_last_started_to: None,
453+
completion_time,
454+
}
455+
}
456+
FailedActivationWFTReport::ReportLegacyQueryFailure(task_token, failure) => {
457+
warn!(run_id=%run_id, failure=?failure, "Failing legacy query request");
458+
self.respond_legacy_query(task_token, legacy_query_failure(failure))
459+
.await;
460+
WFTReportStatus::Reported {
461+
reset_last_started_to: None,
462+
completion_time,
463+
}
464+
}
465+
}
466+
}
467+
468+
async fn handle_activation_completed_result(
469+
&self,
470+
run_id: &str,
471+
wft_from_complete: &mut Option<ValidPollWFTQResponse>,
472+
completion_outcome: ActivationCompleteResult,
473+
) -> WFTReportStatus {
474+
let completion_time = Instant::now();
475+
476+
match completion_outcome.outcome {
477+
ActivationCompleteOutcome::ReportWFTSuccess(report) => {
478+
self.handle_activation_completed_success(
479+
run_id,
480+
wft_from_complete,
481+
completion_time,
482+
report,
483+
)
484+
.await
485+
}
486+
ActivationCompleteOutcome::ReportWFTFail(outcome) => {
487+
self.handle_activation_failed(run_id, completion_time, outcome)
488+
.await
489+
}
490+
ActivationCompleteOutcome::WFTFailedDontReport => WFTReportStatus::DropWft,
491+
ActivationCompleteOutcome::DoNothing => WFTReportStatus::NotReported,
492+
}
493+
}
494+
320495
/// Queue an activation completion for processing, returning a future that will resolve with
321496
/// the outcome of that completion. See [ActivationCompletedOutcome].
322497
///
@@ -357,114 +532,12 @@ impl Workflows {
357532
);
358533
return Ok(());
359534
};
535+
let replaying = completion_outcome.replaying;
360536

361537
let mut wft_from_complete = None;
362-
let completion_time = Instant::now();
363-
let wft_report_status = match completion_outcome.outcome {
364-
ActivationCompleteOutcome::ReportWFTSuccess(report) => match report {
365-
ServerCommandsWithWorkflowInfo {
366-
task_token,
367-
action:
368-
ActivationAction::WftComplete {
369-
mut commands,
370-
messages,
371-
query_responses,
372-
force_new_wft,
373-
sdk_metadata,
374-
mut versioning_behavior,
375-
},
376-
} => {
377-
let reserved_act_permits =
378-
self.reserve_activity_slots_for_outgoing_commands(commands.as_mut_slice());
379-
debug!(commands=%commands.display(), query_responses=%query_responses.display(),
380-
messages=%messages.display(), force_new_wft,
381-
"Sending responses to server");
382-
if let Some(default_vb) = self.default_versioning_behavior.as_ref() {
383-
if versioning_behavior == VersioningBehavior::Unspecified {
384-
versioning_behavior = *default_vb;
385-
}
386-
}
387-
let mut completion = WorkflowTaskCompletion {
388-
task_token,
389-
commands,
390-
messages,
391-
query_responses,
392-
sticky_attributes: None,
393-
return_new_workflow_task: true,
394-
force_create_new_workflow_task: force_new_wft,
395-
sdk_metadata,
396-
metering_metadata: MeteringMetadata {
397-
nonfirst_local_activity_execution_attempts: self
398-
.local_act_mgr
399-
.get_nonfirst_attempt_count(&run_id)
400-
as u32,
401-
},
402-
versioning_behavior,
403-
};
404-
let sticky_attrs = self.sticky_attrs.clone();
405-
// Do not return new WFT if we would not cache, because returned new WFTs are
406-
// always partial.
407-
if sticky_attrs.is_none() {
408-
completion.return_new_workflow_task = false;
409-
}
410-
completion.sticky_attributes = sticky_attrs;
411-
412-
let mut reset_last_started_to = None;
413-
self.handle_wft_reporting_errs(&run_id, || async {
414-
let response = self.client.complete_workflow_task(completion).await?;
415-
if response.reset_history_event_id > 0 {
416-
reset_last_started_to = Some(response.reset_history_event_id);
417-
}
418-
if let Some(wft) = response.workflow_task {
419-
wft_from_complete = Some(validate_wft(wft)?);
420-
}
421-
self.handle_eager_activities(reserved_act_permits, response.activity_tasks);
422-
Ok(())
423-
})
424-
.await;
425-
WFTReportStatus::Reported {
426-
reset_last_started_to,
427-
completion_time,
428-
}
429-
}
430-
ServerCommandsWithWorkflowInfo {
431-
task_token,
432-
action: ActivationAction::RespondLegacyQuery { result },
433-
} => {
434-
self.respond_legacy_query(task_token, *result).await;
435-
WFTReportStatus::Reported {
436-
reset_last_started_to: None,
437-
completion_time,
438-
}
439-
}
440-
},
441-
ActivationCompleteOutcome::ReportWFTFail(outcome) => match outcome {
442-
FailedActivationWFTReport::Report(tt, cause, failure) => {
443-
warn!(run_id=%run_id, failure=?failure, "Failing workflow task");
444-
self.handle_wft_reporting_errs(&run_id, || async {
445-
self.client
446-
.fail_workflow_task(tt, cause, failure.failure)
447-
.await
448-
})
449-
.await;
450-
WFTReportStatus::Reported {
451-
reset_last_started_to: None,
452-
completion_time,
453-
}
454-
}
455-
FailedActivationWFTReport::ReportLegacyQueryFailure(task_token, failure) => {
456-
warn!(run_id=%run_id, failure=?failure, "Failing legacy query request");
457-
self.respond_legacy_query(task_token, legacy_query_failure(failure))
458-
.await;
459-
WFTReportStatus::Reported {
460-
reset_last_started_to: None,
461-
completion_time,
462-
}
463-
}
464-
},
465-
ActivationCompleteOutcome::WFTFailedDontReport => WFTReportStatus::DropWft,
466-
ActivationCompleteOutcome::DoNothing => WFTReportStatus::NotReported,
467-
};
538+
let wft_report_status = self
539+
.handle_activation_completed_result(&run_id, &mut wft_from_complete, completion_outcome)
540+
.await;
468541

469542
let maybe_pwft = if let Some(wft) = wft_from_complete {
470543
match HistoryPaginator::from_poll(wft, self.client.clone()).await {
@@ -485,7 +558,7 @@ impl Workflows {
485558
if let Some(h) = post_activate_hook {
486559
h(PostActivateHookData {
487560
run_id: &run_id,
488-
replaying: completion_outcome.replaying,
561+
replaying,
489562
});
490563
}
491564

0 commit comments

Comments
 (0)