11use crate :: {
2- Client , IsWorkerTaskLongPoll , NamespacedClient , NoRetryOnMatching , Result , RetryConfig ,
3- raw:: IsUserLongPoll ,
2+ Client , IsWorkerTaskLongPoll , MESSAGE_TOO_LARGE_KEY , NamespacedClient , NoRetryOnMatching ,
3+ Result , RetryConfig , raw:: IsUserLongPoll ,
44} ;
55use backoff:: { Clock , SystemClock , backoff:: Backoff , exponential:: ExponentialBackoff } ;
66use futures_retry:: { ErrorHandler , FutureRetry , RetryPolicy } ;
@@ -201,7 +201,11 @@ where
201201{
202202 type OutError = tonic:: Status ;
203203
204- fn handle ( & mut self , current_attempt : usize , e : tonic:: Status ) -> RetryPolicy < tonic:: Status > {
204+ fn handle (
205+ & mut self ,
206+ current_attempt : usize ,
207+ mut e : tonic:: Status ,
208+ ) -> RetryPolicy < tonic:: Status > {
205209 // 0 max retries means unlimited retries
206210 if self . max_retries > 0 && current_attempt >= self . max_retries {
207211 return RetryPolicy :: ForwardError ( e) ;
@@ -213,6 +217,24 @@ where
213217 }
214218 }
215219
220+ // Short circuit if message is too large - this is not retryable
221+ if e. code ( ) == Code :: ResourceExhausted
222+ && ( e
223+ . message ( )
224+ . starts_with ( "grpc: received message larger than max" )
225+ || e. message ( )
226+ . starts_with ( "grpc: message after decompression larger than max" )
227+ || e. message ( )
228+ . starts_with ( "grpc: received message after decompression larger than max" ) )
229+ {
230+ // Leave a marker so we don't have duplicate detection logic in the workflow
231+ e. metadata_mut ( ) . insert (
232+ MESSAGE_TOO_LARGE_KEY ,
233+ tonic:: metadata:: MetadataValue :: from ( 0 ) ,
234+ ) ;
235+ return RetryPolicy :: ForwardError ( e) ;
236+ }
237+
216238 // Task polls are OK with being cancelled or running into the timeout because there's
217239 // nothing to do but retry anyway
218240 let long_poll_allowed = self . call_type == CallType :: TaskLongPoll
@@ -423,6 +445,47 @@ mod tests {
423445 assert_matches ! ( result, RetryPolicy :: ForwardError ( _) )
424446 }
425447
448+ #[ tokio:: test]
449+ async fn message_too_large_not_retried ( ) {
450+ let mut err_handler = TonicErrorHandler :: new_with_clock (
451+ CallInfo {
452+ call_type : CallType :: TaskLongPoll ,
453+ call_name : POLL_WORKFLOW_METH_NAME ,
454+ retry_cfg : TEST_RETRY_CONFIG ,
455+ retry_short_circuit : None ,
456+ } ,
457+ TEST_RETRY_CONFIG ,
458+ FixedClock ( Instant :: now ( ) ) ,
459+ FixedClock ( Instant :: now ( ) ) ,
460+ ) ;
461+ let result = err_handler. handle (
462+ 1 ,
463+ Status :: new (
464+ Code :: ResourceExhausted ,
465+ "grpc: received message larger than max" ,
466+ ) ,
467+ ) ;
468+ assert_matches ! ( result, RetryPolicy :: ForwardError ( _) ) ;
469+
470+ let result = err_handler. handle (
471+ 1 ,
472+ Status :: new (
473+ Code :: ResourceExhausted ,
474+ "grpc: message after decompression larger than max" ,
475+ ) ,
476+ ) ;
477+ assert_matches ! ( result, RetryPolicy :: ForwardError ( _) ) ;
478+
479+ let result = err_handler. handle (
480+ 1 ,
481+ Status :: new (
482+ Code :: ResourceExhausted ,
483+ "grpc: received message after decompression larger than max" ,
484+ ) ,
485+ ) ;
486+ assert_matches ! ( result, RetryPolicy :: ForwardError ( _) ) ;
487+ }
488+
426489 #[ rstest:: rstest]
427490 #[ tokio:: test]
428491 async fn task_poll_retries_forever < R > (
0 commit comments