Skip to content

Commit 26826fc

Browse files
authored
feat(relayer): in-app retry logic (#605)
* feat: adjust exponential backoff logic and introduce max age for worker messages * feat: retry with an explicit delay for worker messages * chore: update bridge sdk version
1 parent 95f9084 commit 26826fc

File tree

11 files changed

+117
-68
lines changed

11 files changed

+117
-68
lines changed

omni-relayer/Cargo.lock

Lines changed: 16 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

omni-relayer/Cargo.toml

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "omni-relayer"
3-
version = "0.5.21"
3+
version = "0.5.22"
44
edition = "2024"
55
resolver = "2"
66
repository = "https://github.com/Near-One/omni-bridge"
@@ -51,19 +51,19 @@ redis = { version = "0.32.5", features = ["aio", "tokio-comp", "connection-manag
5151
async-nats = "0.45"
5252
reqwest = "0.12"
5353

54-
bridge-connector-common = { git = "https://github.com/Near-One/bridge-sdk-rs", package = "bridge-connector-common", rev = "ee86758afa5ee7ab3863bedb562342ace1d08d7d" }
55-
near-rpc-client = { git = "https://github.com/Near-One/bridge-sdk-rs", package = "near-rpc-client", rev = "ee86758afa5ee7ab3863bedb562342ace1d08d7d" }
54+
bridge-connector-common = { git = "https://github.com/Near-One/bridge-sdk-rs", package = "bridge-connector-common", rev = "19f6cc07f1802497aabee9c943358a16c3a6d901" }
55+
near-rpc-client = { git = "https://github.com/Near-One/bridge-sdk-rs", package = "near-rpc-client", rev = "19f6cc07f1802497aabee9c943358a16c3a6d901" }
5656

57-
near-bridge-client = { git = "https://github.com/Near-One/bridge-sdk-rs", package = "near-bridge-client", rev = "ee86758afa5ee7ab3863bedb562342ace1d08d7d" }
58-
evm-bridge-client = { git = "https://github.com/Near-One/bridge-sdk-rs", package = "evm-bridge-client", rev = "ee86758afa5ee7ab3863bedb562342ace1d08d7d" }
59-
solana-bridge-client = { git = "https://github.com/Near-One/bridge-sdk-rs", package = "solana-bridge-client", rev = "ee86758afa5ee7ab3863bedb562342ace1d08d7d" }
60-
starknet-bridge-client = { git = "https://github.com/Near-One/bridge-sdk-rs", package = "starknet-bridge-client", rev = "ee86758afa5ee7ab3863bedb562342ace1d08d7d" }
61-
utxo-bridge-client = { git = "https://github.com/Near-One/bridge-sdk-rs", package = "utxo-bridge-client", rev = "ee86758afa5ee7ab3863bedb562342ace1d08d7d" }
62-
utxo-utils = { git = "https://github.com/Near-One/bridge-sdk-rs", package = "utxo-utils", rev = "ee86758afa5ee7ab3863bedb562342ace1d08d7d" }
63-
wormhole-bridge-client = { git = "https://github.com/Near-One/bridge-sdk-rs", package = "wormhole-bridge-client", rev = "ee86758afa5ee7ab3863bedb562342ace1d08d7d" }
64-
light-client = { git = "https://github.com/Near-One/bridge-sdk-rs", package = "light-client", rev = "ee86758afa5ee7ab3863bedb562342ace1d08d7d" }
57+
near-bridge-client = { git = "https://github.com/Near-One/bridge-sdk-rs", package = "near-bridge-client", rev = "19f6cc07f1802497aabee9c943358a16c3a6d901" }
58+
evm-bridge-client = { git = "https://github.com/Near-One/bridge-sdk-rs", package = "evm-bridge-client", rev = "19f6cc07f1802497aabee9c943358a16c3a6d901" }
59+
solana-bridge-client = { git = "https://github.com/Near-One/bridge-sdk-rs", package = "solana-bridge-client", rev = "19f6cc07f1802497aabee9c943358a16c3a6d901" }
60+
starknet-bridge-client = { git = "https://github.com/Near-One/bridge-sdk-rs", package = "starknet-bridge-client", rev = "19f6cc07f1802497aabee9c943358a16c3a6d901" }
61+
utxo-bridge-client = { git = "https://github.com/Near-One/bridge-sdk-rs", package = "utxo-bridge-client", rev = "19f6cc07f1802497aabee9c943358a16c3a6d901" }
62+
utxo-utils = { git = "https://github.com/Near-One/bridge-sdk-rs", package = "utxo-utils", rev = "19f6cc07f1802497aabee9c943358a16c3a6d901" }
63+
wormhole-bridge-client = { git = "https://github.com/Near-One/bridge-sdk-rs", package = "wormhole-bridge-client", rev = "19f6cc07f1802497aabee9c943358a16c3a6d901" }
64+
light-client = { git = "https://github.com/Near-One/bridge-sdk-rs", package = "light-client", rev = "19f6cc07f1802497aabee9c943358a16c3a6d901" }
6565

66-
omni-connector = { git = "https://github.com/Near-One/bridge-sdk-rs", package = "omni-connector", rev = "ee86758afa5ee7ab3863bedb562342ace1d08d7d" }
66+
omni-connector = { git = "https://github.com/Near-One/bridge-sdk-rs", package = "omni-connector", rev = "19f6cc07f1802497aabee9c943358a16c3a6d901" }
6767

6868
# The profile that 'dist' will build with
6969
[profile.dist]

omni-relayer/example-devnet-config.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ name = "relayer-worker"
2424
stream = "RELAYER"
2525
subject = "relayer.tasks.>"
2626
max_deliver = -1
27-
backoff_secs = [60, 120, 300, 900, 3600]
27+
ack_wait = 300
28+
max_backoff_hours = 16
29+
max_message_age_hours = 96
2830
worker_count = 20
2931

3032
[bridge_indexer]

omni-relayer/example-mainnet-config.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ keep_transfers_for_secs = 604800 # 60 * 60 * 24 * 7
2525
# stream = "RELAYER"
2626
# subject = "relayer.tasks.>"
2727
# max_deliver = -1
28-
# backoff_secs = [60, 120, 300, 900, 3600]
28+
# ack_wait = 300
29+
# max_backoff_hours = 16
30+
# max_message_age_hours = 96
2931
# worker_count = 20
3032

3133
[bridge_indexer]

omni-relayer/example-testnet-config.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ name = "relayer-worker"
2424
stream = "RELAYER"
2525
subject = "relayer.tasks.>"
2626
max_deliver = -1
27-
backoff_secs = [60, 120, 300, 900, 3600]
27+
ack_wait = 300
28+
max_backoff_hours = 16
29+
max_message_age_hours = 96
2830
worker_count = 20
2931

3032
[bridge_indexer]

omni-relayer/src/config.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,9 +221,13 @@ pub struct NatsConsumer {
221221
pub stream: String,
222222
pub subject: String,
223223
pub max_deliver: i64,
224+
#[serde(default)]
224225
pub backoff_secs: Vec<u64>,
225226
#[serde(default = "default_worker_count")]
226227
pub worker_count: usize,
228+
pub ack_wait: u64,
229+
pub max_backoff_hours: u64,
230+
pub max_message_age_hours: u64,
227231
}
228232

229233
fn default_worker_count() -> usize {

omni-relayer/src/startup/bridge_indexer.rs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -101,25 +101,21 @@ fn is_whitelisted_transaction_event(
101101
transfer_message: &OmniTransferMessage,
102102
) -> bool {
103103
match transfer_message {
104-
OmniTransferMessage::NearTransferMessage(transfer_message) => {
105-
config
106-
.bridge_indexer
107-
.is_token_whitelisted(&transfer_message.token)
108-
}
109-
OmniTransferMessage::NearSignTransferEvent(sign_event) => {
110-
config
111-
.bridge_indexer
112-
.is_token_whitelisted(&sign_event.message_payload.token_address)
113-
}
114-
OmniTransferMessage::EvmInitTransferMessage(init_transfer) => {
115-
config.bridge_indexer.is_token_whitelisted(&init_transfer.token)
116-
}
117-
OmniTransferMessage::SolanaInitTransfer(init_transfer) => {
118-
config.bridge_indexer.is_token_whitelisted(&init_transfer.token)
119-
}
120-
OmniTransferMessage::StarknetInitTransfer(init_transfer) => {
121-
config.bridge_indexer.is_token_whitelisted(&init_transfer.token)
122-
}
104+
OmniTransferMessage::NearTransferMessage(transfer_message) => config
105+
.bridge_indexer
106+
.is_token_whitelisted(&transfer_message.token),
107+
OmniTransferMessage::NearSignTransferEvent(sign_event) => config
108+
.bridge_indexer
109+
.is_token_whitelisted(&sign_event.message_payload.token_address),
110+
OmniTransferMessage::EvmInitTransferMessage(init_transfer) => config
111+
.bridge_indexer
112+
.is_token_whitelisted(&init_transfer.token),
113+
OmniTransferMessage::SolanaInitTransfer(init_transfer) => config
114+
.bridge_indexer
115+
.is_token_whitelisted(&init_transfer.token),
116+
OmniTransferMessage::StarknetInitTransfer(init_transfer) => config
117+
.bridge_indexer
118+
.is_token_whitelisted(&init_transfer.token),
123119
OmniTransferMessage::NearUtxoTransferMessage { token_id, .. } => config
124120
.bridge_indexer
125121
.is_token_whitelisted(&OmniAddress::Near(token_id.clone())),
@@ -167,7 +163,11 @@ async fn handle_transaction_event(
167163
event: OmniTransactionEvent,
168164
) -> Result<()> {
169165
if config.bridge_indexer.is_whitelist_active()
170-
&& !is_whitelisted_transaction_event(config, event.transfer_id.origin_chain, &event.transfer_message)
166+
&& !is_whitelisted_transaction_event(
167+
config,
168+
event.transfer_id.origin_chain,
169+
&event.transfer_message,
170+
)
171171
{
172172
debug!(
173173
"Whitelist mode active, skipping transaction event: {:?}",

omni-relayer/src/startup/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,8 @@ fn build_utxo_bridge_client<C: utxo_bridge_client::types::UTXOChain>(
177177
}
178178
};
179179

180-
Ok(utxo.as_ref()
180+
Ok(utxo
181+
.as_ref()
181182
.map(|utxo| UTXOBridgeClient::new(utxo.rpc_http_url.clone(), AuthOptions::None)))
182183
}
183184

omni-relayer/src/utils/nats.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ impl NatsClient {
5050
}
5151

5252
pub async fn relayer_consumer(&self, config: &config::Nats) -> Result<consumer::PullConsumer> {
53+
let ack_wait = Duration::from_secs(config.relayer_consumer.ack_wait);
54+
5355
self.jetstream
5456
.create_consumer_strict_on_stream(
5557
consumer::pull::Config {
@@ -58,12 +60,7 @@ impl NatsClient {
5860
deliver_policy: consumer::DeliverPolicy::Last,
5961
max_deliver: config.relayer_consumer.max_deliver,
6062
filter_subject: config.relayer_consumer.subject.clone(),
61-
backoff: config
62-
.relayer_consumer
63-
.backoff_secs
64-
.iter()
65-
.map(|&s| Duration::from_secs(s))
66-
.collect(),
63+
ack_wait,
6764
..Default::default()
6865
},
6966
&config.relayer_consumer.stream,

omni-relayer/src/workers/evm.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::sync::Arc;
1+
use std::{sync::Arc, time::Duration};
22

33
use anyhow::{Context, Result};
44
use bridge_connector_common::result::{BridgeSdkError, EthRpcError};
@@ -51,7 +51,9 @@ pub async fn process_init_transfer_event(
5151
let current_timestamp = chrono::Utc::now().timestamp();
5252

5353
if current_timestamp < creation_timestamp + expected_finalization_time {
54-
return Ok(EventAction::Retry);
54+
let remaining =
55+
(creation_timestamp + expected_finalization_time - current_timestamp) as u64;
56+
return Ok(EventAction::RetryAfter(Duration::from_secs(remaining)));
5557
}
5658

5759
info!(
@@ -326,7 +328,9 @@ pub async fn process_evm_transfer_event(
326328
let current_timestamp = chrono::Utc::now().timestamp();
327329

328330
if current_timestamp < creation_timestamp + expected_finalization_time {
329-
return Ok(EventAction::Retry);
331+
let remaining =
332+
(creation_timestamp + expected_finalization_time - current_timestamp) as u64;
333+
return Ok(EventAction::RetryAfter(Duration::from_secs(remaining)));
330334
}
331335

332336
info!("Processing FinTransfer ({chain_kind:?}): {transaction_hash:?}");
@@ -463,7 +467,9 @@ pub async fn process_deploy_token_event(
463467
let current_timestamp = chrono::Utc::now().timestamp();
464468

465469
if current_timestamp < creation_timestamp + expected_finalization_time {
466-
return Ok(EventAction::Retry);
470+
let remaining =
471+
(creation_timestamp + expected_finalization_time - current_timestamp) as u64;
472+
return Ok(EventAction::RetryAfter(Duration::from_secs(remaining)));
467473
}
468474

469475
info!("Processing DeployToken ({chain_kind:?}): {transaction_hash:?}");

0 commit comments

Comments
 (0)