Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
3cd2438
feat: add a bakcground task that updates the finalized nonce periodic…
kamiyaa Oct 10, 2025
6d161ba
Merge branch 'main' into jeff/nonce-update
kamiyaa Oct 13, 2025
4d0e79d
feat: update import
kamiyaa Oct 14, 2025
92182d5
Merge branch 'jeff/nonce-update' of github.com:hyperlane-xyz/hyperlan…
kamiyaa Oct 14, 2025
52d36bf
feat: unify logic
kamiyaa Oct 14, 2025
c1e19d7
Merge branch 'main' into jeff/nonce-update
kamiyaa Oct 14, 2025
07906f5
feat: add log message
kamiyaa Oct 16, 2025
3d91aed
Merge branch 'main' into jeff/nonce-update
kamiyaa Oct 16, 2025
d7e2db4
feat: add new methods to AdaptsChain
kamiyaa Oct 17, 2025
907d1e5
Merge branch 'main' into jeff/nonce-update
kamiyaa Oct 17, 2025
839e2f4
feat: get payloads instead of tx and build txs
kamiyaa Oct 17, 2025
8862ce1
Merge branch 'jeff/nonce-update' of github.com:hyperlane-xyz/hyperlan…
kamiyaa Oct 17, 2025
72114a9
feat: fix comment
kamiyaa Oct 17, 2025
80e88a4
feat: use get_tracked_tx
kamiyaa Oct 17, 2025
0f9f94f
feat: update log
kamiyaa Oct 17, 2025
4eef7e0
feat: fix nonce off by one
kamiyaa Oct 17, 2025
7a6e4af
feat: add more logs
kamiyaa Oct 17, 2025
3c97b84
feat: check for default uuid
kamiyaa Oct 17, 2025
3107996
feat: add tests
kamiyaa Oct 17, 2025
5af19b9
feat: fix tests
kamiyaa Oct 20, 2025
544aea5
Merge branch 'main' into jeff/nonce-update
kamiyaa Oct 20, 2025
9be4a9a
feat: fix tests
kamiyaa Oct 20, 2025
a1bb370
Merge branch 'main' into jeff/nonce-update
kamiyaa Oct 21, 2025
c2c66de
Merge branch 'main' into jeff/nonce-update
kamiyaa Oct 21, 2025
7f9968a
Merge branch 'main' into jeff/nonce-update
kamiyaa Oct 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions rust/main/lander/src/adapter/chains/ethereum/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,60 @@ impl AdaptsChain for EthereumAdapter {
Ok(reverted)
}

fn reprocess_txs_poll_rate(&self) -> Option<Duration> {
// if the block time is too short, we want to cap it at 5s because we don't want
// to query the nonce too much. 5s should be quick enough for a reorg
Some((*self.estimated_block_time()).max(Duration::from_secs(5)))
}

async fn get_reprocess_txs(&self) -> Result<Vec<Transaction>, LanderError> {
let old_finalized_nonce = self
.nonce_manager
.state
.get_finalized_nonce()
.await?
.unwrap_or_default();
self.nonce_manager.nonce_updater.update_boundaries().await?;
let new_finalized_nonce = self
.nonce_manager
.state
.get_finalized_nonce()
.await?
.unwrap_or_default();

if new_finalized_nonce >= old_finalized_nonce {
return Ok(Vec::new());
}

warn!(
?old_finalized_nonce,
?new_finalized_nonce,
"New finalized nonce is lower than old finalized nonce"
);

let mut txs = Vec::new();
let mut nonce = new_finalized_nonce.saturating_add(U256::one());
while nonce <= old_finalized_nonce {
let tx_uuid = self.nonce_manager.state.get_tracked_tx_uuid(&nonce).await?;
if tx_uuid == TransactionUuid::default() {
debug!(
?nonce,
"No tracked transaction UUID for nonce in reorg range"
);
} else if let Some(tx) = self.nonce_manager.state.get_tracked_tx(&tx_uuid).await? {
txs.push(tx);
} else {
debug!(
?nonce,
?tx_uuid,
"No transaction found for nonce in reorg range"
);
}
nonce = nonce.saturating_add(U256::one());
}
Ok(txs)
}

fn estimated_block_time(&self) -> &std::time::Duration {
&self.estimated_block_time
}
Expand Down
3 changes: 1 addition & 2 deletions rust/main/lander/src/adapter/chains/ethereum/nonce.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub(crate) use manager::NonceManager;
pub(crate) use updater::NonceUpdater;

mod db;
mod error;
Expand All @@ -11,8 +12,6 @@ mod updater;
pub(crate) use db::NonceDb;
#[cfg(test)]
pub(crate) use state::NonceManagerState;
#[cfg(test)]
pub(crate) use updater::NonceUpdater;

#[cfg(test)]
mod tests;
10 changes: 8 additions & 2 deletions rust/main/lander/src/adapter/chains/ethereum/nonce/manager.rs
Comment thread
kamiyaa marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::sync::Arc;
use std::time::Duration;

use ethers::signers::Signer;
use ethers_core::types::Address;
Expand Down Expand Up @@ -41,8 +42,13 @@ impl NonceManager {
let tx_db = db.clone() as Arc<dyn TransactionDb>;
let state = Arc::new(NonceManagerState::new(nonce_db, tx_db, address, metrics));

let nonce_updater =
NonceUpdater::new(address, reorg_period, block_time, provider, state.clone());
let nonce_updater = NonceUpdater::new(
address,
reorg_period,
block_time,
provider.clone(),
state.clone(),
);

let manager = Self {
address,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ impl NonceManagerState {
Ok((finalized_nonce, upper_nonce))
}

pub(super) async fn get_tracked_tx(
pub async fn get_tracked_tx(
&self,
tx_uuid: &TransactionUuid,
) -> NonceResult<Option<Transaction>> {
Expand Down Expand Up @@ -47,7 +47,7 @@ impl NonceManagerState {
Ok(())
}

pub(super) async fn get_tracked_tx_uuid(&self, nonce: &U256) -> NonceResult<TransactionUuid> {
pub async fn get_tracked_tx_uuid(&self, nonce: &U256) -> NonceResult<TransactionUuid> {
let tx_uuid = self
.nonce_db
.retrieve_transaction_uuid_by_nonce_and_signer_address(nonce, &self.address)
Expand All @@ -65,7 +65,7 @@ impl NonceManagerState {
Ok(())
}

pub(super) async fn get_finalized_nonce(&self) -> NonceResult<Option<U256>> {
pub async fn get_finalized_nonce(&self) -> NonceResult<Option<U256>> {
let finalized_nonce = self
.nonce_db
.retrieve_finalized_nonce_by_signer_address(&self.address)
Expand Down
19 changes: 19 additions & 0 deletions rust/main/lander/src/adapter/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,23 @@ pub trait AdaptsChain: Send + Sync {
async fn replace_tx(&self, _tx: &Transaction) -> Result<(), LanderError> {
todo!()
}

/// Returns the polling interval for checking if transactions need reprocessing.
///
/// Returns `None` if the adapter does not support transaction reprocessing,
/// or `Some(Duration)` specifying how frequently to poll.
fn reprocess_txs_poll_rate(&self) -> Option<Duration> {
None
}

/// Get a list of transactions that need to be reprocessed.
///
/// Returns an empty vector if no transactions need reprocessing or if the adapter
/// does not support reprocessing.
///
/// Note: Implementations may update internal state (e.g., finalized nonce boundaries)
/// as part of determining which transactions need reprocessing.
async fn get_reprocess_txs(&self) -> Result<Vec<Transaction>, LanderError> {
Ok(Vec::new())
}
}
43 changes: 43 additions & 0 deletions rust/main/lander/src/dispatcher/stages/inclusion_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ impl InclusionStage {
domain,
} = self;
let futures = vec![
tokio::spawn(
Self::receive_reprocess_txs(domain.clone(), pool.clone(), state.clone())
.instrument(info_span!("receive_reprocess_txs")),
),
tokio::spawn(
Self::receive_txs(tx_receiver, pool.clone(), state.clone(), domain.clone())
.instrument(info_span!("receive_txs")),
Expand Down Expand Up @@ -180,6 +184,45 @@ impl InclusionStage {
Ok(())
}

#[instrument(skip_all, fields(domain))]
pub async fn receive_reprocess_txs(
Comment thread
kamiyaa marked this conversation as resolved.
domain: String,
pool: InclusionStagePool,
state: DispatcherState,
) -> Result<(), LanderError> {
let poll_rate = match state.adapter.reprocess_txs_poll_rate() {
Some(s) => s,
// if no poll rate, then that means we don't worry about reprocessing txs
None => return Ok(()),
};
loop {
state.metrics.update_liveness_metric(
format!("{}::receive_reprocess_txs", STAGE_NAME).as_str(),
&domain,
);

tokio::time::sleep(poll_rate).await;
tracing::debug!(
domain,
"Checking for any transactions that needs reprocessing"
);

let txs = match state.adapter.get_reprocess_txs().await {
Ok(s) => s,
_ => continue,
};
if txs.is_empty() {
continue;
}

tracing::debug!(?txs, "Reprocessing transactions");
let mut locked_pool = pool.lock().await;
for tx in txs {
locked_pool.insert(tx.uuid.clone(), tx);
}
}
}

fn tx_ready_for_processing(
base_interval: Duration,
now: DateTime<Utc>,
Expand Down
86 changes: 86 additions & 0 deletions rust/main/lander/src/dispatcher/stages/inclusion_stage/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ async fn test_processing_included_txs() {
const TXS_TO_PROCESS: usize = 3;

let mut mock_adapter = MockAdapter::new();
mock_adapter
.expect_reprocess_txs_poll_rate()
.returning(|| None);
mock_adapter
.expect_estimated_block_time()
.return_const(Duration::from_millis(400));
Expand Down Expand Up @@ -88,6 +91,9 @@ async fn test_failed_simulation() {
const TXS_TO_PROCESS: usize = 3;

let mut mock_adapter = MockAdapter::new();
mock_adapter
.expect_reprocess_txs_poll_rate()
.returning(|| None);
mock_adapter
.expect_estimated_block_time()
.return_const(Duration::from_millis(400));
Expand Down Expand Up @@ -125,6 +131,9 @@ async fn test_failed_estimation() {
const TXS_TO_PROCESS: usize = 3;

let mut mock_adapter = MockAdapter::new();
mock_adapter
.expect_reprocess_txs_poll_rate()
.returning(|| None);
mock_adapter
.expect_estimated_block_time()
.return_const(Duration::from_millis(400));
Expand Down Expand Up @@ -198,6 +207,9 @@ async fn test_channel_closed_before_any_tx() {
#[tokio::test]
async fn test_transaction_status_dropped() {
let mut mock_adapter = MockAdapter::new();
mock_adapter
.expect_reprocess_txs_poll_rate()
.returning(|| None);
mock_adapter
.expect_estimated_block_time()
.return_const(Duration::from_millis(400));
Expand Down Expand Up @@ -249,6 +261,9 @@ async fn test_transaction_not_ready_for_resubmission() {
#[tokio::test]
async fn test_failed_submission_after_simulation_and_estimation() {
let mut mock_adapter = MockAdapter::new();
mock_adapter
.expect_reprocess_txs_poll_rate()
.returning(|| None);
mock_adapter
.expect_estimated_block_time()
.return_const(Duration::from_millis(400));
Expand Down Expand Up @@ -283,6 +298,9 @@ async fn test_failed_submission_after_simulation_and_estimation() {
#[tokio::test]
async fn test_transaction_included_immediately() {
let mut mock_adapter = MockAdapter::new();
mock_adapter
.expect_reprocess_txs_poll_rate()
.returning(|| None);
mock_adapter
.expect_estimated_block_time()
.return_const(Duration::from_millis(400));
Expand All @@ -307,6 +325,9 @@ async fn test_transaction_included_immediately() {
#[tokio::test]
async fn test_transaction_pending_then_included() {
let mut mock_adapter = MockAdapter::new();
mock_adapter
.expect_reprocess_txs_poll_rate()
.returning(|| None);
mock_adapter
.expect_estimated_block_time()
.return_const(Duration::from_millis(400));
Expand Down Expand Up @@ -558,3 +579,68 @@ async fn test_reasonable_receipt_query_frequency() {
queries_per_second_per_tx
);
}

#[tokio::test]
async fn test_processing_reprocess_txs() {
let txs_to_process = 4;
let (payload_db, tx_db, _) = tmp_dbs();
let (_sender, building_stage_receiver) = mpsc::channel(txs_to_process);
let (finality_stage_sender, _receiver) = mpsc::channel(txs_to_process);
let txs_created = create_random_txs_and_store_them(
txs_to_process,
&payload_db,
&tx_db,
TransactionStatus::PendingInclusion,
)
.await;

let mut mock_adapter = MockAdapter::new();
mock_adapter
.expect_estimated_block_time()
.return_const(Duration::from_millis(400));
mock_adapter
.expect_tx_status()
.returning(|_| Ok(TransactionStatus::PendingInclusion));
mock_adapter
.expect_tx_ready_for_resubmission()
.returning(|_| false);

mock_adapter
.expect_reprocess_txs_poll_rate()
.return_const(Some(Duration::from_millis(50)));
let mut txs_created_option = Some(txs_created.clone());
mock_adapter.expect_get_reprocess_txs().returning(move || {
if let Some(txs) = txs_created_option.take() {
Ok(txs)
} else {
Ok(Vec::new())
}
});

let state = DispatcherState::new(
payload_db.clone(),
tx_db.clone(),
Arc::new(mock_adapter),
DispatcherMetrics::dummy_instance(),
"test".to_string(),
);
let inclusion_stage = InclusionStage::new(
building_stage_receiver,
finality_stage_sender,
state,
"test".to_string(),
);
let pool = inclusion_stage.pool.clone();

let stage = tokio::spawn(async move { inclusion_stage.run().await });
let _ = tokio::select! {
// this arm runs indefinitely
_ = stage => {
},
// this arm is the timeout - increased to accommodate adaptive polling
_ = sleep(Duration::from_millis(500)) => {
}
};

assert!(are_all_txs_in_pool(txs_created.clone(), &pool).await);
}
2 changes: 2 additions & 0 deletions rust/main/lander/src/tests/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ mockall::mock! {
fn update_vm_specific_metrics(&self, _tx: &Transaction, _metrics: &DispatcherMetrics);
async fn nonce_gap_exists(&self) -> bool;
async fn replace_tx(&self, _tx: &Transaction) -> Result<(), LanderError>;
fn reprocess_txs_poll_rate(&self) -> Option<std::time::Duration>;
async fn get_reprocess_txs(&self) -> Result<Vec<Transaction>, LanderError>;
}
}

Expand Down
Loading