Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions crates/transaction-pool/src/pool/best.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ pub struct BestTransactions<T: TransactionOrdering> {
/// These new pending transactions are inserted into this iterator's pool before yielding the
/// next value
pub(crate) new_transaction_receiver: Option<Receiver<PendingTransaction<T>>>,
/// Used to receive transaction removals from the pool after this iterator was created.
///
/// Removed transactions are deleted from this iterator's snapshot before yielding the next
/// value, preventing inclusion of transactions that were removed by maintenance jobs.
pub(crate) removed_transaction_receiver: Option<Receiver<TransactionId>>,
Comment on lines 101 to +106
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not opposed to this, but since this is an internal channel we can unify this by introducing

enum PendingPoolEvent<T> {Added(Tx),Removed(tx)}

/// The priority value of most recently yielded transaction.
///
/// This is required if new pending transactions are fed in while it yields new values.
Expand Down Expand Up @@ -156,6 +161,19 @@ impl<T: TransactionOrdering> BestTransactions<T> {
}
}

/// Non-blocking read on the removed transactions subscription channel
fn try_recv_removed(&mut self) -> Option<TransactionId> {
loop {
match self.removed_transaction_receiver.as_mut()?.try_recv() {
Ok(tx_id) => return Some(tx_id),
Err(TryRecvError::Lagged(_)) => {
// Handle the case where the receiver lagged too far behind.
}
Err(_) => return None,
}
}
}

/// Removes the currently best independent transaction from the independent set and the total
/// set.
fn pop_best(&mut self) -> Option<PendingTransaction<T>> {
Expand All @@ -164,6 +182,21 @@ impl<T: TransactionOrdering> BestTransactions<T> {
})
}

/// Removes transactions that have been removed from the `PendingPool` after this iterator was
/// created
fn remove_removed_transactions(&mut self) {
for _ in 0..MAX_NEW_TRANSACTIONS_PER_BATCH {
if let Some(tx_id) = self.try_recv_removed() {
// Remove from both the snapshot and independent set
if let Some(tx) = self.all.remove(&tx_id) {
self.independent.remove(&tx);
}
} else {
break;
}
}
}

/// Checks for new transactions that have come into the `PendingPool` after this iterator was
/// created and inserts them
fn add_new_transactions(&mut self) {
Expand All @@ -189,6 +222,7 @@ impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransaction

fn no_updates(&mut self) {
self.new_transaction_receiver.take();
self.removed_transaction_receiver.take();
self.last_priority.take();
}

Expand All @@ -207,6 +241,7 @@ impl<T: TransactionOrdering> Iterator for BestTransactions<T> {
fn next(&mut self) -> Option<Self::Item> {
loop {
self.add_new_transactions();
self.remove_removed_transactions();
// Remove the next independent tx with the highest priority
let best = self.pop_best()?;
let sender_id = best.transaction.sender_id();
Expand Down
10 changes: 10 additions & 0 deletions crates/transaction-pool/src/pool/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ pub struct PendingPool<T: TransactionOrdering> {
/// Used to broadcast new transactions that have been added to the `PendingPool` to existing
/// `static_files` of this pool.
new_transaction_notifier: broadcast::Sender<PendingTransaction<T>>,
/// Used to broadcast transaction removals to existing iterators of this pool.
removed_transaction_notifier: broadcast::Sender<TransactionId>,
}

// === impl PendingPool ===
Expand All @@ -63,6 +65,7 @@ impl<T: TransactionOrdering> PendingPool<T> {
/// Create a new pool instance with the given buffer capacity.
pub fn with_buffer(ordering: T, buffer_capacity: usize) -> Self {
let (new_transaction_notifier, _) = broadcast::channel(buffer_capacity);
let (removed_transaction_notifier, _) = broadcast::channel(buffer_capacity);
Self {
ordering,
submission_id: 0,
Expand All @@ -71,6 +74,7 @@ impl<T: TransactionOrdering> PendingPool<T> {
highest_nonces: Default::default(),
size_of: Default::default(),
new_transaction_notifier,
removed_transaction_notifier,
}
}

Expand Down Expand Up @@ -111,6 +115,7 @@ impl<T: TransactionOrdering> PendingPool<T> {
independent: self.independent_transactions.values().cloned().collect(),
invalid: Default::default(),
new_transaction_receiver: Some(self.new_transaction_notifier.subscribe()),
removed_transaction_receiver: Some(self.removed_transaction_notifier.subscribe()),
last_priority: None,
skip_blobs: false,
}
Expand Down Expand Up @@ -334,6 +339,11 @@ impl<T: TransactionOrdering> PendingPool<T> {
let tx = self.by_id.remove(id)?;
self.size_of -= tx.transaction.size();

// Broadcast the removal to any existing iterators
if self.removed_transaction_notifier.receiver_count() > 0 {
let _ = self.removed_transaction_notifier.send(*id);
}

match self.highest_nonces.entry(id.sender) {
Entry::Occupied(mut entry) => {
if entry.get().transaction.nonce() == id.nonce {
Expand Down
70 changes: 70 additions & 0 deletions crates/transaction-pool/src/pool/txpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4523,4 +4523,74 @@ mod tests {
"Non-4844 tx should gain ENOUGH_FEE_CAP_BLOCK bit after basefee decrease"
);
}

/// Test that verifies removed transactions are not included by existing iterators.
///
/// This test simulates a race condition where:
/// 1. Block builder creates an iterator (takes snapshot of pool)
/// 2. Maintenance job removes a transaction from the pool
/// 3. Block builder continues iterating
///
/// The test verifies that removal notifications reach the iterator, so the removed
/// transaction is not returned even though it was in the original snapshot.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_race_condition_snapshot_vs_removal() {
use parking_lot::RwLock;
use std::{
sync::{Arc, Barrier, Mutex},
time::Duration,
};

let mut pool = TxPool::new(MockOrdering::default(), Default::default());
let mut factory = MockTransactionFactory::default();

let tx = factory.validated(MockTransaction::eip1559().with_nonce(0));
let tx_hash = *tx.hash();

pool.add_transaction(tx, U256::from(1000), 0, None).unwrap();
assert_eq!(pool.pending().len(), 1);

let pool = Arc::new(RwLock::new(pool));
let pool1 = pool.clone();
let pool2 = pool.clone();

let found_in_snapshot = Arc::new(Mutex::new(false));
let was_removed = Arc::new(Mutex::new(false));
let found_ref = found_in_snapshot.clone();
let removed_ref = was_removed.clone();

let barrier = Arc::new(Barrier::new(2));
let barrier1 = barrier.clone();
let barrier2 = barrier.clone();

// Thread 1: Create iterator and collect transactions (simulates block builder)
let snapshot_handle = tokio::task::spawn_blocking(move || {
let best_txs = pool1.read().best_transactions();
barrier1.wait();

// Simulate block building delay
std::thread::sleep(Duration::from_millis(100));

let txs: Vec<_> = best_txs.collect();
*found_ref.lock().unwrap() = !txs.is_empty();
});

// Thread 2: Remove transaction (simulates maintenance job)
let removal_handle = tokio::task::spawn_blocking(move || {
barrier2.wait();
std::thread::sleep(Duration::from_millis(20));

let removed = pool2.write().remove_transactions(vec![tx_hash]);
*removed_ref.lock().unwrap() = !removed.is_empty();
});

snapshot_handle.await.unwrap();
removal_handle.await.unwrap();

let found = *found_in_snapshot.lock().unwrap();
let removed = *was_removed.lock().unwrap();

assert!(removed, "Transaction should have been removed from pool");
assert!(!found, "Iterator should not return removed transaction");
}
}
Loading