From e59c96b970ff909d7a51989641c19065fd4d7270 Mon Sep 17 00:00:00 2001 From: Tiburso Date: Tue, 15 Apr 2025 17:21:23 +0100 Subject: [PATCH] feat: nack the messages in case something fails mid processing --- .../consumer/rabbitmq/consumer.rs | 26 ++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/relay/src/task_event_consumer/consumer/rabbitmq/consumer.rs b/relay/src/task_event_consumer/consumer/rabbitmq/consumer.rs index 4637e9f..15db438 100644 --- a/relay/src/task_event_consumer/consumer/rabbitmq/consumer.rs +++ b/relay/src/task_event_consumer/consumer/rabbitmq/consumer.rs @@ -217,6 +217,18 @@ impl TaskEventConsumer for RabbitMQTaskEventConsumer { Ok(msg) => msg, Err(e) => { error!(error = %e, "Error parsing message"); + + // Nack the message when parsing fails + if let Err(nack_err) = channel + .basic_nack(delivery_tag, lapin::options::BasicNackOptions::default()) + .await + { + error!( + error = %nack_err, + delivery_tag = %delivery_tag, + "Failed to nack message after parsing error" + ); + } continue; } }; @@ -224,10 +236,22 @@ impl TaskEventConsumer for RabbitMQTaskEventConsumer { // Handle the event. If it fails, we log it, nack it, and continue. if let Err(e) = self.handle_events(vec![event]).await { error!(error = %e, "Error handling events"); + + // Nack the message when handling fails + if let Err(nack_err) = channel + .basic_nack(delivery_tag, lapin::options::BasicNackOptions::default()) + .await + { + error!( + error = %nack_err, + delivery_tag = %delivery_tag, + "Failed to nack message after handling error" + ); + } continue; } - // Ackowledge the message so we don't re-process it. + // Acknowledge the message when processing is successful debug!(queue = %QUEUE_NAME, delivery_tag = %delivery_tag, "Acknowledging message"); if let Err(e) = channel .basic_ack(delivery_tag, BasicAckOptions::default())