diff --git a/kms-connector/crates/kms-worker/src/core/event_processor/kms_client.rs b/kms-connector/crates/kms-worker/src/core/event_processor/kms_client.rs index dae9595956..9adcb1b210 100644 --- a/kms-connector/crates/kms-worker/src/core/event_processor/kms_client.rs +++ b/kms-connector/crates/kms-worker/src/core/event_processor/kms_client.rs @@ -242,24 +242,22 @@ where Fut: Future, Status>>, { for i in 1..=retries { - match request_fn() - .await - .map_err(ProcessingError::from_request_status) - { + match request_fn().await { Ok(_) => break, - Err(ProcessingError::Irrecoverable(e)) => { + Err(e) if e.code() == Code::AlreadyExists => return Ok(()), + Err(e) if [Code::ResourceExhausted, Code::Unknown].contains(&e.code()) => { CORE_REQUEST_SENT_ERRORS.inc(); - return Err(ProcessingError::Irrecoverable(e)); - } - Err(ProcessingError::Recoverable(e)) => { - CORE_REQUEST_SENT_ERRORS.inc(); - warn!("#{}/{} GRPC request attempt failed: {}", i, retries, e); + warn!("#{i}/{retries} GRPC request attempt failed: {e}"); if i == retries { return Err(ProcessingError::Recoverable(anyhow!( "All GRPC requests failed!" ))); } } + Err(e) => { + CORE_REQUEST_SENT_ERRORS.inc(); + return Err(ProcessingError::Irrecoverable(e.into())); + } } } CORE_REQUEST_SENT_COUNTER.inc(); @@ -287,7 +285,7 @@ where return Ok(response); } Err(status) => { - if status.code() == Code::Unavailable { + if [Code::Unavailable, Code::Unknown].contains(&status.code()) { // Check if we've exceeded the timeout if start.elapsed() >= timeout { CORE_RESPONSE_ERRORS.inc(); diff --git a/kms-connector/crates/kms-worker/src/core/event_processor/processor.rs b/kms-connector/crates/kms-worker/src/core/event_processor/processor.rs index 4ffa9a92f9..9d8a3aa362 100644 --- a/kms-connector/crates/kms-worker/src/core/event_processor/processor.rs +++ b/kms-connector/crates/kms-worker/src/core/event_processor/processor.rs @@ -121,20 +121,11 @@ impl DbEventProcessor

{ } impl ProcessingError { - /// Converts GRPC status of a request sent to the KMS into a `ProcessingError`. - pub fn from_request_status(value: tonic::Status) -> Self { - let anyhow_error = anyhow!("KMS GRPC error: {value}"); - match value.code() { - Code::ResourceExhausted => Self::Recoverable(anyhow_error), - _ => Self::Irrecoverable(anyhow_error), - } - } - /// Converts GRPC status of the polling of a KMS Response into a `ProcessingError`. pub fn from_response_status(value: tonic::Status) -> Self { let anyhow_error = anyhow!("KMS GRPC error: {value}"); match value.code() { - Code::NotFound | Code::Unavailable | Code::ResourceExhausted => { + Code::DeadlineExceeded | Code::Unavailable | Code::ResourceExhausted => { Self::Recoverable(anyhow_error) } _ => Self::Irrecoverable(anyhow_error),