Skip to content

Commit a68bec2

Browse files
Fix/nonce to low recovery (#41)
* fix: shutdown on background tasks * fix: nonce to low if gets out of sync recovery * fix: remove startup filtering
1 parent d71cd46 commit a68bec2

File tree

14 files changed

+381
-41
lines changed

14 files changed

+381
-41
lines changed

crates/cli/Makefile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@ start_base:
1616
RUST_BACKTRACE=1 cargo run -- start --path $(CURDIR)/../../playground/base
1717
start_sepolia:
1818
RUST_BACKTRACE=1 cargo run -- start --path $(CURDIR)/../../playground/sepolia
19-
sepolia_ethereum_fireblocks:
19+
start_sepolia_ethereum_fireblocks:
2020
RUST_BACKTRACE=1 cargo run -- start --path $(CURDIR)/../../playground/sepolia_ethereum_fireblocks
21+
start_e2e:
22+
RUST_BACKTRACE=1 cargo run -- start --path $(CURDIR)/../e2e-tests
2123

2224
auth_login:
2325
cargo run -- auth login

crates/core/src/background_tasks/automatic_top_up_task.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use crate::{
1111
relayer::Relayer,
1212
safe_proxy::SafeProxyManager,
1313
shared::common_types::EvmAddress,
14+
shutdown::subscribe_to_shutdown,
1415
transaction::queue_system::TransactionsQueues,
1516
yaml::{AllOrAddresses, Erc20TokenConfig, NativeTokenConfig, NetworkAutomaticTopUpConfig},
1617
SetupConfig,
@@ -66,6 +67,7 @@ impl AutomaticTopUpTask {
6667
info!("Starting automatic top-up background task");
6768

6869
self.refresh_relayer_cache().await;
70+
let mut shutdown_rx = subscribe_to_shutdown();
6971

7072
loop {
7173
tokio::select! {
@@ -75,6 +77,10 @@ impl AutomaticTopUpTask {
7577
_ = self.top_up_check_interval.tick() => {
7678
self.check_and_top_up_addresses().await;
7779
}
80+
_ = shutdown_rx.recv() => {
81+
info!("Shutdown signal received, stopping automatic top-up task");
82+
break;
83+
}
7884
}
7985
}
8086
}

crates/core/src/background_tasks/balance_monitor.rs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ use tracing::{error, info, warn};
66

77
use crate::common_types::EvmAddress;
88
use crate::{
9-
network::ChainId, postgres::PostgresClient, provider::EvmProvider, webhooks::WebhookManager,
9+
network::ChainId, postgres::PostgresClient, provider::EvmProvider,
10+
shutdown::subscribe_to_shutdown, webhooks::WebhookManager,
1011
};
1112

1213
fn get_minimum_balance_threshold(chain_id: &ChainId) -> u128 {
@@ -28,19 +29,26 @@ pub async fn balance_monitor(
2829

2930
tokio::spawn(async move {
3031
let mut interval = tokio::time::interval(Duration::from_secs(600));
32+
let mut shutdown_rx = subscribe_to_shutdown();
3133

3234
loop {
33-
interval.tick().await;
34-
35-
info!("Starting balance monitoring check");
35+
tokio::select! {
36+
_ = interval.tick() => {
37+
info!("Starting balance monitoring check");
38+
39+
for provider in providers.iter() {
40+
if let Err(e) = check_balances_for_chain(provider, &db, &webhook_manager).await {
41+
error!("Failed to check balances for chain {}: {}", provider.chain_id, e);
42+
}
43+
}
3644

37-
for provider in providers.iter() {
38-
if let Err(e) = check_balances_for_chain(provider, &db, &webhook_manager).await {
39-
error!("Failed to check balances for chain {}: {}", provider.chain_id, e);
45+
info!("Completed balance monitoring check");
46+
}
47+
_ = shutdown_rx.recv() => {
48+
info!("Shutdown signal received, stopping balance monitor");
49+
break;
4050
}
4151
}
42-
43-
info!("Completed balance monitoring check");
4452
}
4553
});
4654

crates/core/src/background_tasks/webhook_manager_task.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{provider::EvmProvider, webhooks::WebhookManager};
1+
use crate::{provider::EvmProvider, shutdown::subscribe_to_shutdown, webhooks::WebhookManager};
22
use std::sync::Arc;
33
use tokio::sync::Mutex;
44
use tracing::info;
@@ -34,6 +34,8 @@ async fn run_webhook_background_tasks(webhook_manager: Arc<Mutex<WebhookManager>
3434

3535
info!("Starting webhook background processing loops");
3636

37+
let mut shutdown_rx = subscribe_to_shutdown();
38+
3739
loop {
3840
tokio::select! {
3941
_ = retry_interval.tick() => {
@@ -68,6 +70,10 @@ async fn run_webhook_background_tasks(webhook_manager: Arc<Mutex<WebhookManager>
6870
_ = database_cleanup_interval.tick() => {
6971
cleanup_webhook_database_history(webhook_manager.clone()).await;
7072
}
73+
_ = shutdown_rx.recv() => {
74+
info!("Shutdown signal received, stopping webhook manager");
75+
break;
76+
}
7177
}
7278
}
7379
}

crates/core/src/gas/blob_gas_oracle.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use std::{collections::HashMap, sync::Arc};
22

3-
use crate::{network::ChainId, provider::EvmProvider, transaction::types::TransactionSpeed};
3+
use crate::{
4+
network::ChainId, provider::EvmProvider, shutdown::subscribe_to_shutdown,
5+
transaction::types::TransactionSpeed,
6+
};
47
use serde::{Deserialize, Serialize};
58
use tokio::{
69
sync::Mutex,
@@ -130,16 +133,24 @@ pub async fn blob_gas_oracle(
130133

131134
tokio::spawn(async move {
132135
let mut interval = time::interval(Duration::from_secs(5));
136+
let mut shutdown_rx = subscribe_to_shutdown();
137+
133138
loop {
134-
interval.tick().await;
135-
136-
let blob_gas_price_result = provider.calculate_ethereum_blob_gas_price().await;
137-
if let Ok(blob_gas_price) = blob_gas_price_result {
138-
cache
139-
.lock()
140-
.await
141-
.update_blob_gas_price(provider.chain_id, blob_gas_price)
142-
.await;
139+
tokio::select! {
140+
_ = interval.tick() => {
141+
let blob_gas_price_result = provider.calculate_ethereum_blob_gas_price().await;
142+
if let Ok(blob_gas_price) = blob_gas_price_result {
143+
cache
144+
.lock()
145+
.await
146+
.update_blob_gas_price(provider.chain_id, blob_gas_price)
147+
.await;
148+
}
149+
}
150+
_ = shutdown_rx.recv() => {
151+
info!("Shutdown signal received, stopping blob gas oracle for provider: {}", provider.name);
152+
break;
153+
}
143154
}
144155
}
145156
});

crates/core/src/gas/gas_oracle.rs

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
use std::{collections::HashMap, sync::Arc};
22

33
use super::fee_estimator::{GasEstimatorResult, GasPriceResult};
4-
use crate::{network::ChainId, provider::EvmProvider, transaction::types::TransactionSpeed};
4+
use crate::{
5+
network::ChainId, provider::EvmProvider, shutdown::subscribe_to_shutdown,
6+
transaction::types::TransactionSpeed,
7+
};
58
use tokio::{
69
sync::Mutex,
710
time::{self, Duration},
@@ -91,16 +94,24 @@ pub async fn gas_oracle(
9194

9295
tokio::spawn(async move {
9396
let mut interval = time::interval(Duration::from_secs(5));
94-
loop {
95-
interval.tick().await;
97+
let mut shutdown_rx = subscribe_to_shutdown();
9698

97-
let gas_price_result = provider.calculate_gas_price().await;
98-
match gas_price_result {
99-
Ok(gas_price) => {
100-
cache.lock().await.update_gas_price(provider.chain_id, gas_price).await;
99+
loop {
100+
tokio::select! {
101+
_ = interval.tick() => {
102+
let gas_price_result = provider.calculate_gas_price().await;
103+
match gas_price_result {
104+
Ok(gas_price) => {
105+
cache.lock().await.update_gas_price(provider.chain_id, gas_price).await;
106+
}
107+
Err(err) => {
108+
error!("Failed to get gas price for provider: {} - error {} - try again in 10s", provider.name, err);
109+
}
110+
}
101111
}
102-
Err(err) => {
103-
error!("Failed to get gas price for provider: {} - error {} - try again in 10s", provider.name, err);
112+
_ = shutdown_rx.recv() => {
113+
info!("Shutdown signal received, stopping gas oracle for provider: {}", provider.name);
114+
break;
104115
}
105116
}
106117
}

crates/core/src/transaction/db/write.rs

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ use crate::{
88
utils::option_if,
99
},
1010
transaction::types::{
11-
Transaction, TransactionData, TransactionHash, TransactionId, TransactionStatus,
12-
TransactionValue,
11+
Transaction, TransactionData, TransactionHash, TransactionId, TransactionNonce,
12+
TransactionStatus, TransactionValue,
1313
},
1414
};
1515
use alloy::network::AnyTransactionReceipt;
@@ -427,6 +427,47 @@ impl PostgresClient {
427427
Ok(())
428428
}
429429

430+
pub async fn transaction_update_nonce(
431+
&mut self,
432+
transaction_id: &TransactionId,
433+
nonce: &TransactionNonce,
434+
) -> Result<(), PostgresError> {
435+
let mut conn = self.pool.get().await?;
436+
let trans = conn.transaction().await.map_err(PostgresError::PgError)?;
437+
438+
trans
439+
.execute(
440+
"UPDATE relayer.transaction SET nonce = $2 WHERE id = $1",
441+
&[&transaction_id, &(nonce.into_inner() as i64)],
442+
)
443+
.await?;
444+
445+
trans
446+
.execute(
447+
"
448+
INSERT INTO relayer.transaction_audit_log (
449+
id, relayer_id, \"to\", \"from\", nonce, chain_id, data, value, blobs, gas_limit,
450+
speed, status, expires_at, queued_at, sent_at, mined_at, confirmed_at,
451+
failed_at, failed_reason, hash, sent_max_priority_fee_per_gas,
452+
sent_max_fee_per_gas, gas_price, block_hash, block_number, external_id
453+
)
454+
SELECT
455+
id, relayer_id, \"to\", \"from\", $2, chain_id, data, value, blobs, gas_limit,
456+
speed, status, expires_at, queued_at, sent_at, mined_at, confirmed_at,
457+
failed_at, failed_reason, hash, sent_max_priority_fee_per_gas,
458+
sent_max_fee_per_gas, gas_price, block_hash, block_number, external_id
459+
FROM relayer.transaction
460+
WHERE id = $1;
461+
",
462+
&[&transaction_id, &(nonce.into_inner() as i64)],
463+
)
464+
.await?;
465+
466+
trans.commit().await?;
467+
468+
Ok(())
469+
}
470+
430471
pub async fn transaction_expired(
431472
&mut self,
432473
transaction_id: &TransactionId,

crates/core/src/transaction/queue_system/transactions_queue.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1398,4 +1398,30 @@ impl TransactionsQueue {
13981398
let address = self.relay_address();
13991399
self.evm_provider.get_balance(&address).await
14001400
}
1401+
1402+
pub async fn update_pending_transaction_nonce(
1403+
&self,
1404+
transaction_id: &TransactionId,
1405+
new_nonce: TransactionNonce,
1406+
) {
1407+
let mut pending = self.pending_transactions.lock().await;
1408+
if let Some(transaction) = pending.iter_mut().find(|tx| tx.id == *transaction_id) {
1409+
transaction.nonce = new_nonce;
1410+
}
1411+
}
1412+
1413+
pub async fn update_inmempool_transaction_nonce(
1414+
&self,
1415+
transaction_id: &TransactionId,
1416+
new_nonce: TransactionNonce,
1417+
) {
1418+
let mut inmempool = self.inmempool_transactions.lock().await;
1419+
if let Some(competitive_tx) =
1420+
inmempool.iter_mut().find(|ctx| ctx.get_transaction_by_id(transaction_id).is_some())
1421+
{
1422+
if let Some(transaction) = competitive_tx.get_transaction_by_id_mut(transaction_id) {
1423+
transaction.nonce = new_nonce;
1424+
}
1425+
}
1426+
}
14011427
}

0 commit comments

Comments
 (0)