Skip to content

Commit 5494850

Browse files
committed
fix(kms-connector): poll KMS response if request already exists
1 parent dfea184 commit 5494850

File tree

2 files changed

+10
-21
lines changed

2 files changed

+10
-21
lines changed

kms-connector/crates/kms-worker/src/core/event_processor/kms_client.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -242,24 +242,22 @@ where
242242
Fut: Future<Output = Result<Response<Empty>, Status>>,
243243
{
244244
for i in 1..=retries {
245-
match request_fn()
246-
.await
247-
.map_err(ProcessingError::from_request_status)
248-
{
245+
match request_fn().await {
249246
Ok(_) => break,
250-
Err(ProcessingError::Irrecoverable(e)) => {
247+
Err(e) if e.code() == Code::AlreadyExists => return Ok(()),
248+
Err(e) if [Code::ResourceExhausted, Code::Unknown].contains(&e.code()) => {
251249
CORE_REQUEST_SENT_ERRORS.inc();
252-
return Err(ProcessingError::Irrecoverable(e));
253-
}
254-
Err(ProcessingError::Recoverable(e)) => {
255-
CORE_REQUEST_SENT_ERRORS.inc();
256-
warn!("#{}/{} GRPC request attempt failed: {}", i, retries, e);
250+
warn!("#{i}/{retries} GRPC request attempt failed: {e}");
257251
if i == retries {
258252
return Err(ProcessingError::Recoverable(anyhow!(
259253
"All GRPC requests failed!"
260254
)));
261255
}
262256
}
257+
Err(e) => {
258+
CORE_REQUEST_SENT_ERRORS.inc();
259+
return Err(ProcessingError::Irrecoverable(e.into()));
260+
}
263261
}
264262
}
265263
CORE_REQUEST_SENT_COUNTER.inc();
@@ -287,7 +285,7 @@ where
287285
return Ok(response);
288286
}
289287
Err(status) => {
290-
if status.code() == Code::Unavailable {
288+
if [Code::Unavailable, Code::Unknown].contains(&status.code()) {
291289
// Check if we've exceeded the timeout
292290
if start.elapsed() >= timeout {
293291
CORE_RESPONSE_ERRORS.inc();

kms-connector/crates/kms-worker/src/core/event_processor/processor.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -121,20 +121,11 @@ impl<P: Provider> DbEventProcessor<P> {
121121
}
122122

123123
impl ProcessingError {
124-
/// Converts GRPC status of a request sent to the KMS into a `ProcessingError`.
125-
pub fn from_request_status(value: tonic::Status) -> Self {
126-
let anyhow_error = anyhow!("KMS GRPC error: {value}");
127-
match value.code() {
128-
Code::ResourceExhausted => Self::Recoverable(anyhow_error),
129-
_ => Self::Irrecoverable(anyhow_error),
130-
}
131-
}
132-
133124
/// Converts GRPC status of the polling of a KMS Response into a `ProcessingError`.
134125
pub fn from_response_status(value: tonic::Status) -> Self {
135126
let anyhow_error = anyhow!("KMS GRPC error: {value}");
136127
match value.code() {
137-
Code::NotFound | Code::Unavailable | Code::ResourceExhausted => {
128+
Code::DeadlineExceeded | Code::Unavailable | Code::ResourceExhausted => {
138129
Self::Recoverable(anyhow_error)
139130
}
140131
_ => Self::Irrecoverable(anyhow_error),

0 commit comments

Comments
 (0)