Skip to content

Commit efbcc7c

Browse files
committed
fix: pr review
1 parent d68b2b6 commit efbcc7c

2 files changed

Lines changed: 29 additions & 25 deletions

File tree

rust/main/agents/relayer/src/msg/pending_message.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -864,7 +864,7 @@ impl PendingMessage {
864864
// exceeded. Without this check, a small max_retries value (e.g. 3) would still
865865
// hit the fixed early-backoff arms (1 => 5s, 2 => 10s, ...) rather than dropping.
866866
// Normal relayer messages do NOT set fail_fast and continue to the long-backoff arm.
867-
if self.fail_fast && self.num_retries > self.max_retries {
867+
if self.fail_fast && self.num_retries >= self.max_retries {
868868
warn!(
869869
message_id = ?self.message.id(),
870870
num_retries = self.num_retries,

rust/main/agents/relayer/src/relay_api/handlers.rs

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,8 @@ async fn create_relay(
560560
})?
561561
.clone();
562562

563+
// 3 retries is intentional for the relay API fast path — if it fails the
564+
// contract indexer will re-queue it within seconds.
563565
let pending_msg = PendingMessage::maybe_from_persisted_retries(
564566
extracted.message.clone(),
565567
msg_ctx.clone(),
@@ -587,29 +589,9 @@ async fn create_relay(
587589
});
588590
}
589591

590-
// Phase 2: all messages validated — now commit side effects.
591-
// Insert into the dedup cache only here, after extraction and all validation passed.
592-
// Inserting earlier would block retries on transient extraction failures (RPC timeout,
593-
// block not yet indexed) since the cache TTL is 5 minutes.
594-
if let Some(cache) = &state.tx_hash_cache {
595-
let mut cache = cache.write().await;
596-
match cache.check_and_insert(req.origin_chain.clone(), req.tx_hash.clone()) {
597-
Ok(()) => {}
598-
Err(TxHashCacheError::CacheFull) => {
599-
state.record_failure("cache_full");
600-
return Err(ServerError::ServiceUnavailable(
601-
"Service temporarily unavailable".to_string(),
602-
));
603-
}
604-
Err(TxHashCacheError::Duplicate) => {
605-
state.record_failure("duplicate_tx");
606-
return Err(ServerError::TooManyRequests(
607-
"Transaction already submitted recently".to_string(),
608-
));
609-
}
610-
}
611-
}
612-
592+
// Phase 2: send all messages first, then commit the dedup key only on full success.
593+
// Inserting into the dedup cache before knowing whether sends succeed would prevent
594+
// clients from retrying if the processor channel is unavailable.
613595
let mut send_failed = false;
614596
for v in validated {
615597
if let Err(e) = v
@@ -656,13 +638,35 @@ async fn create_relay(
656638
});
657639
}
658640

659-
if processed_messages.is_empty() && send_failed {
641+
// Any send failure is non-200 so the client knows to retry. The dedup key is not
642+
// committed in this case, so the client is free to resubmit the same tx hash.
643+
if send_failed {
660644
state.record_failure("send_failed");
661645
return Err(ServerError::InternalError(
662646
"Failed to send messages to processor".to_string(),
663647
));
664648
}
665649

650+
// All sends succeeded — now lock the tx hash to prevent replays.
651+
if let Some(cache) = &state.tx_hash_cache {
652+
let mut cache = cache.write().await;
653+
match cache.check_and_insert(req.origin_chain.clone(), req.tx_hash.clone()) {
654+
Ok(()) => {}
655+
Err(TxHashCacheError::CacheFull) => {
656+
state.record_failure("cache_full");
657+
return Err(ServerError::ServiceUnavailable(
658+
"Service temporarily unavailable".to_string(),
659+
));
660+
}
661+
Err(TxHashCacheError::Duplicate) => {
662+
state.record_failure("duplicate_tx");
663+
return Err(ServerError::TooManyRequests(
664+
"Transaction already submitted recently".to_string(),
665+
));
666+
}
667+
}
668+
}
669+
666670
state.record_success();
667671

668672
// 4. Return success with all processed messages

0 commit comments

Comments
 (0)