Skip to content

Commit 6b859b4

Browse files
committed
feat: retry with an explicit delay for worker messages
1 parent f5b11c8 commit 6b859b4

File tree

4 files changed

+43
-29
lines changed

4 files changed

+43
-29
lines changed

omni-relayer/src/startup/bridge_indexer.rs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -101,25 +101,21 @@ fn is_whitelisted_transaction_event(
101101
transfer_message: &OmniTransferMessage,
102102
) -> bool {
103103
match transfer_message {
104-
OmniTransferMessage::NearTransferMessage(transfer_message) => {
105-
config
106-
.bridge_indexer
107-
.is_token_whitelisted(&transfer_message.token)
108-
}
109-
OmniTransferMessage::NearSignTransferEvent(sign_event) => {
110-
config
111-
.bridge_indexer
112-
.is_token_whitelisted(&sign_event.message_payload.token_address)
113-
}
114-
OmniTransferMessage::EvmInitTransferMessage(init_transfer) => {
115-
config.bridge_indexer.is_token_whitelisted(&init_transfer.token)
116-
}
117-
OmniTransferMessage::SolanaInitTransfer(init_transfer) => {
118-
config.bridge_indexer.is_token_whitelisted(&init_transfer.token)
119-
}
120-
OmniTransferMessage::StarknetInitTransfer(init_transfer) => {
121-
config.bridge_indexer.is_token_whitelisted(&init_transfer.token)
122-
}
104+
OmniTransferMessage::NearTransferMessage(transfer_message) => config
105+
.bridge_indexer
106+
.is_token_whitelisted(&transfer_message.token),
107+
OmniTransferMessage::NearSignTransferEvent(sign_event) => config
108+
.bridge_indexer
109+
.is_token_whitelisted(&sign_event.message_payload.token_address),
110+
OmniTransferMessage::EvmInitTransferMessage(init_transfer) => config
111+
.bridge_indexer
112+
.is_token_whitelisted(&init_transfer.token),
113+
OmniTransferMessage::SolanaInitTransfer(init_transfer) => config
114+
.bridge_indexer
115+
.is_token_whitelisted(&init_transfer.token),
116+
OmniTransferMessage::StarknetInitTransfer(init_transfer) => config
117+
.bridge_indexer
118+
.is_token_whitelisted(&init_transfer.token),
123119
OmniTransferMessage::NearUtxoTransferMessage { token_id, .. } => config
124120
.bridge_indexer
125121
.is_token_whitelisted(&OmniAddress::Near(token_id.clone())),
@@ -167,7 +163,11 @@ async fn handle_transaction_event(
167163
event: OmniTransactionEvent,
168164
) -> Result<()> {
169165
if config.bridge_indexer.is_whitelist_active()
170-
&& !is_whitelisted_transaction_event(config, event.transfer_id.origin_chain, &event.transfer_message)
166+
&& !is_whitelisted_transaction_event(
167+
config,
168+
event.transfer_id.origin_chain,
169+
&event.transfer_message,
170+
)
171171
{
172172
debug!(
173173
"Whitelist mode active, skipping transaction event: {:?}",

omni-relayer/src/startup/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,8 @@ fn build_utxo_bridge_client<C: utxo_bridge_client::types::UTXOChain>(
177177
}
178178
};
179179

180-
Ok(utxo.as_ref()
180+
Ok(utxo
181+
.as_ref()
181182
.map(|utxo| UTXOBridgeClient::new(utxo.rpc_http_url.clone(), AuthOptions::None)))
182183
}
183184

omni-relayer/src/workers/evm.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::sync::Arc;
1+
use std::{sync::Arc, time::Duration};
22

33
use anyhow::{Context, Result};
44
use bridge_connector_common::result::{BridgeSdkError, EthRpcError};
@@ -51,7 +51,9 @@ pub async fn process_init_transfer_event(
5151
let current_timestamp = chrono::Utc::now().timestamp();
5252

5353
if current_timestamp < creation_timestamp + expected_finalization_time {
54-
return Ok(EventAction::Retry);
54+
let remaining =
55+
(creation_timestamp + expected_finalization_time - current_timestamp) as u64;
56+
return Ok(EventAction::RetryAfter(Duration::from_secs(remaining)));
5557
}
5658

5759
info!(
@@ -326,7 +328,9 @@ pub async fn process_evm_transfer_event(
326328
let current_timestamp = chrono::Utc::now().timestamp();
327329

328330
if current_timestamp < creation_timestamp + expected_finalization_time {
329-
return Ok(EventAction::Retry);
331+
let remaining =
332+
(creation_timestamp + expected_finalization_time - current_timestamp) as u64;
333+
return Ok(EventAction::RetryAfter(Duration::from_secs(remaining)));
330334
}
331335

332336
info!("Processing FinTransfer ({chain_kind:?}): {transaction_hash:?}");
@@ -463,7 +467,9 @@ pub async fn process_deploy_token_event(
463467
let current_timestamp = chrono::Utc::now().timestamp();
464468

465469
if current_timestamp < creation_timestamp + expected_finalization_time {
466-
return Ok(EventAction::Retry);
470+
let remaining =
471+
(creation_timestamp + expected_finalization_time - current_timestamp) as u64;
472+
return Ok(EventAction::RetryAfter(Duration::from_secs(remaining)));
467473
}
468474

469475
info!("Processing DeployToken ({chain_kind:?}): {transaction_hash:?}");

omni-relayer/src/workers/mod.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ impl<E> RetryableEvent<E> {
5656

5757
pub enum EventAction {
5858
Retry,
59+
RetryAfter(Duration),
5960
Remove,
6061
}
6162

@@ -176,7 +177,7 @@ async fn handle_nats_ack(
176177
let max_message_age = Duration::from_secs(config.max_message_age_hours * 3600);
177178

178179
match result {
179-
Ok(EventAction::Retry) => {
180+
Ok(EventAction::Retry) | Ok(EventAction::RetryAfter(_)) => {
180181
if let Ok(info) = msg.info() {
181182
let now = chrono::Utc::now().timestamp();
182183
let published_at = info.published.unix_timestamp();
@@ -190,8 +191,11 @@ async fn handle_nats_ack(
190191
return;
191192
}
192193

193-
let backoff = Duration::from_secs(4u64.saturating_pow(info.delivered as u32))
194-
.min(max_backoff);
194+
let backoff = if let Ok(EventAction::RetryAfter(delay)) = result {
195+
(*delay).min(max_backoff)
196+
} else {
197+
Duration::from_secs(4u64.saturating_pow(info.delivered as u32)).min(max_backoff)
198+
};
195199
msg.ack_with(async_nats::jetstream::AckKind::Nak(Some(backoff)))
196200
.await
197201
.ok();
@@ -326,7 +330,10 @@ pub async fn process_events(
326330
}
327331

328332
if message_result.needs_evm_nonce_resync
329-
&& matches!(message_result.action, Ok(EventAction::Retry) | Err(_))
333+
&& matches!(
334+
message_result.action,
335+
Ok(EventAction::Retry) | Ok(EventAction::RetryAfter(_)) | Err(_)
336+
)
330337
{
331338
is_evm_nonce_resync_needed.store(true, Ordering::Relaxed);
332339
}

0 commit comments

Comments
 (0)