Skip to content

Commit 91709cf

Browse files
committed
fix(txpool): prevent removed transactions from being included in blocks
Previously, there was a race condition where: 1. Block builder creates a BestTransactions iterator (snapshot via BTreeMap::clone()) 2. Maintenance job removes a transaction from the pool 3. Block builder's snapshot was independent, still containing the removed transaction This could cause expired transactions with time-based validity to be included in blocks after removal. This commit adds removal notifications: - PendingPool broadcasts transaction removals via a new channel - BestTransactions iterator subscribes to removal notifications - Iterator removes transactions from its snapshot when notified - Test verifies that removed transactions are not found in snapshots Fixes race condition for transactions with time-based expiration.
1 parent 7f40013 commit 91709cf

File tree

3 files changed

+115
-0
lines changed

3 files changed

+115
-0
lines changed

crates/transaction-pool/src/pool/best.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,11 @@ pub struct BestTransactions<T: TransactionOrdering> {
9999
/// These new pending transactions are inserted into this iterator's pool before yielding the
100100
/// next value
101101
pub(crate) new_transaction_receiver: Option<Receiver<PendingTransaction<T>>>,
102+
/// Used to receive transaction removals from the pool after this iterator was created.
103+
///
104+
/// Removed transactions are deleted from this iterator's snapshot before yielding the next
105+
/// value, preventing inclusion of transactions that were removed by maintenance jobs.
106+
pub(crate) removed_transaction_receiver: Option<Receiver<TransactionId>>,
102107
/// The priority value of most recently yielded transaction.
103108
///
104109
/// This is required if new pending transactions are fed in while it yields new values.
@@ -156,6 +161,19 @@ impl<T: TransactionOrdering> BestTransactions<T> {
156161
}
157162
}
158163

164+
/// Non-blocking read on the removed transactions subscription channel
165+
fn try_recv_removed(&mut self) -> Option<TransactionId> {
166+
loop {
167+
match self.removed_transaction_receiver.as_mut()?.try_recv() {
168+
Ok(tx_id) => return Some(tx_id),
169+
Err(TryRecvError::Lagged(_)) => {
170+
// Handle the case where the receiver lagged too far behind.
171+
}
172+
Err(_) => return None,
173+
}
174+
}
175+
}
176+
159177
/// Removes the currently best independent transaction from the independent set and the total
160178
/// set.
161179
fn pop_best(&mut self) -> Option<PendingTransaction<T>> {
@@ -164,6 +182,21 @@ impl<T: TransactionOrdering> BestTransactions<T> {
164182
})
165183
}
166184

185+
/// Removes transactions that have been removed from the `PendingPool` after this iterator was
186+
/// created
187+
fn remove_removed_transactions(&mut self) {
188+
for _ in 0..MAX_NEW_TRANSACTIONS_PER_BATCH {
189+
if let Some(tx_id) = self.try_recv_removed() {
190+
// Remove from both the snapshot and independent set
191+
if let Some(tx) = self.all.remove(&tx_id) {
192+
self.independent.remove(&tx);
193+
}
194+
} else {
195+
break;
196+
}
197+
}
198+
}
199+
167200
/// Checks for new transactions that have come into the `PendingPool` after this iterator was
168201
/// created and inserts them
169202
fn add_new_transactions(&mut self) {
@@ -189,6 +222,7 @@ impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransaction
189222

190223
fn no_updates(&mut self) {
191224
self.new_transaction_receiver.take();
225+
self.removed_transaction_receiver.take();
192226
self.last_priority.take();
193227
}
194228

@@ -207,6 +241,7 @@ impl<T: TransactionOrdering> Iterator for BestTransactions<T> {
207241
fn next(&mut self) -> Option<Self::Item> {
208242
loop {
209243
self.add_new_transactions();
244+
self.remove_removed_transactions();
210245
// Remove the next independent tx with the highest priority
211246
let best = self.pop_best()?;
212247
let sender_id = best.transaction.sender_id();

crates/transaction-pool/src/pool/pending.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ pub struct PendingPool<T: TransactionOrdering> {
5050
/// Used to broadcast new transactions that have been added to the `PendingPool` to existing
5151
/// `static_files` of this pool.
5252
new_transaction_notifier: broadcast::Sender<PendingTransaction<T>>,
53+
/// Used to broadcast transaction removals to existing iterators of this pool.
54+
removed_transaction_notifier: broadcast::Sender<TransactionId>,
5355
}
5456

5557
// === impl PendingPool ===
@@ -63,6 +65,7 @@ impl<T: TransactionOrdering> PendingPool<T> {
6365
/// Create a new pool instance with the given buffer capacity.
6466
pub fn with_buffer(ordering: T, buffer_capacity: usize) -> Self {
6567
let (new_transaction_notifier, _) = broadcast::channel(buffer_capacity);
68+
let (removed_transaction_notifier, _) = broadcast::channel(buffer_capacity);
6669
Self {
6770
ordering,
6871
submission_id: 0,
@@ -71,6 +74,7 @@ impl<T: TransactionOrdering> PendingPool<T> {
7174
highest_nonces: Default::default(),
7275
size_of: Default::default(),
7376
new_transaction_notifier,
77+
removed_transaction_notifier,
7478
}
7579
}
7680

@@ -111,6 +115,7 @@ impl<T: TransactionOrdering> PendingPool<T> {
111115
independent: self.independent_transactions.values().cloned().collect(),
112116
invalid: Default::default(),
113117
new_transaction_receiver: Some(self.new_transaction_notifier.subscribe()),
118+
removed_transaction_receiver: Some(self.removed_transaction_notifier.subscribe()),
114119
last_priority: None,
115120
skip_blobs: false,
116121
}
@@ -334,6 +339,11 @@ impl<T: TransactionOrdering> PendingPool<T> {
334339
let tx = self.by_id.remove(id)?;
335340
self.size_of -= tx.transaction.size();
336341

342+
// Broadcast the removal to any existing iterators
343+
if self.removed_transaction_notifier.receiver_count() > 0 {
344+
let _ = self.removed_transaction_notifier.send(*id);
345+
}
346+
337347
match self.highest_nonces.entry(id.sender) {
338348
Entry::Occupied(mut entry) => {
339349
if entry.get().transaction.nonce() == id.nonce {

crates/transaction-pool/src/pool/txpool.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4523,4 +4523,74 @@ mod tests {
45234523
"Non-4844 tx should gain ENOUGH_FEE_CAP_BLOCK bit after basefee decrease"
45244524
);
45254525
}
4526+
4527+
/// Test that verifies removed transactions are not included by existing iterators.
4528+
///
4529+
/// This test simulates a race condition where:
4530+
/// 1. Block builder creates an iterator (takes snapshot of pool)
4531+
/// 2. Maintenance job removes a transaction from the pool
4532+
/// 3. Block builder continues iterating
4533+
///
4534+
/// The test verifies that removal notifications reach the iterator, so the removed
4535+
/// transaction is not returned even though it was in the original snapshot.
4536+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
4537+
async fn test_race_condition_snapshot_vs_removal() {
4538+
use parking_lot::RwLock;
4539+
use std::{
4540+
sync::{Arc, Barrier, Mutex},
4541+
time::Duration,
4542+
};
4543+
4544+
let mut pool = TxPool::new(MockOrdering::default(), Default::default());
4545+
let mut factory = MockTransactionFactory::default();
4546+
4547+
let tx = factory.validated(MockTransaction::eip1559().with_nonce(0));
4548+
let tx_hash = *tx.hash();
4549+
4550+
pool.add_transaction(tx, U256::from(1000), 0, None).unwrap();
4551+
assert_eq!(pool.pending().len(), 1);
4552+
4553+
let pool = Arc::new(RwLock::new(pool));
4554+
let pool1 = pool.clone();
4555+
let pool2 = pool.clone();
4556+
4557+
let found_in_snapshot = Arc::new(Mutex::new(false));
4558+
let was_removed = Arc::new(Mutex::new(false));
4559+
let found_ref = found_in_snapshot.clone();
4560+
let removed_ref = was_removed.clone();
4561+
4562+
let barrier = Arc::new(Barrier::new(2));
4563+
let barrier1 = barrier.clone();
4564+
let barrier2 = barrier.clone();
4565+
4566+
// Thread 1: Create iterator and collect transactions (simulates block builder)
4567+
let snapshot_handle = tokio::task::spawn_blocking(move || {
4568+
let best_txs = pool1.read().best_transactions();
4569+
barrier1.wait();
4570+
4571+
// Simulate block building delay
4572+
std::thread::sleep(Duration::from_millis(100));
4573+
4574+
let txs: Vec<_> = best_txs.collect();
4575+
*found_ref.lock().unwrap() = !txs.is_empty();
4576+
});
4577+
4578+
// Thread 2: Remove transaction (simulates maintenance job)
4579+
let removal_handle = tokio::task::spawn_blocking(move || {
4580+
barrier2.wait();
4581+
std::thread::sleep(Duration::from_millis(20));
4582+
4583+
let removed = pool2.write().remove_transactions(vec![tx_hash]);
4584+
*removed_ref.lock().unwrap() = !removed.is_empty();
4585+
});
4586+
4587+
snapshot_handle.await.unwrap();
4588+
removal_handle.await.unwrap();
4589+
4590+
let found = *found_in_snapshot.lock().unwrap();
4591+
let removed = *was_removed.lock().unwrap();
4592+
4593+
assert!(removed, "Transaction should have been removed from pool");
4594+
assert!(!found, "Iterator should not return removed transaction");
4595+
}
45264596
}

0 commit comments

Comments
 (0)