diff --git a/src/stream.rs b/src/stream.rs index 1a664be..9a9445f 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -220,20 +220,9 @@ where let lag_milliseconds = (Utc::now() - checkpoint_event.id.created_at).num_milliseconds(); metrics::record_processing_lag(self.config.id, lag_milliseconds); - let result = self - .sink + self.sink .publish_events(vec![(*checkpoint_event).clone()]) - .await; - - if let Err(err) = result { - warn!( - checkpoint_event_id = %checkpoint_event.id.id, - current_batch_event_id = %current_batch_event_id.id, - error = %err, - "Failed to publish checkpoint event during failover recovery; skipping replay" - ); - return Ok(()); - } + .await?; info!( "Sink recovered, starting failover replay from checkpoint event id: {:?}",