Skip to content

Commit a760cdc

Browse files
committed
fix(mempool): fix queue rebalance
1 parent 980fdfb commit a760cdc

File tree

6 files changed

+198
-72
lines changed

6 files changed

+198
-72
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/src/types.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ mod tx_envelope_ext {
3131
fn gas_price(&self) -> Result<Option<u128>, anyhow::Error>;
3232
fn value(&self) -> Result<alloy_primitives::U256, anyhow::Error>;
3333
fn signature(&self) -> Result<&alloy_primitives::Signature, anyhow::Error>;
34+
fn nonce(&self) -> Result<u64, anyhow::Error>;
3435
}
3536

3637
impl TxEnvelopeExt for TxEnvelope {
@@ -93,6 +94,16 @@ mod tx_envelope_ext {
9394
_ => bail!("unsupported tx type"),
9495
}
9596
}
97+
98+
fn nonce(&self) -> Result<u64, anyhow::Error> {
99+
match self {
100+
TxEnvelope::Legacy(signed) => Ok(signed.tx().nonce),
101+
TxEnvelope::Eip1559(signed) => Ok(signed.tx().nonce),
102+
TxEnvelope::Eip2930(signed) => Ok(signed.tx().nonce),
103+
TxEnvelope::Eip4844(signed) => Ok(signed.tx().nonce()),
104+
_ => bail!("unsupported tx type"),
105+
}
106+
}
96107
}
97108
}
98109

mempool/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,6 @@ thiserror.workspace = true
3232
futures-util.workspace = true
3333
rust_decimal = "1.36.0"
3434
lru = "0.12.5"
35+
36+
[dev-dependencies]
37+
tracing-subscriber.workspace = true

mempool/src/pools/chain_pool.rs

Lines changed: 98 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -306,9 +306,9 @@ impl<E: Execute, G: GasPricesTrait, C: GetTxCountTrait> ChainPool<E, G, C> {
306306
}
307307

308308
async fn add_tx(&mut self, mut tx: TxRecord) -> Result<(), MempoolError> {
309-
tracing::debug!(tx_hash = %tx.tx_hash(), "schedule tx command");
310309
let chain_id = tx.chain_id();
311310
let sender = tx.sender;
311+
tracing::debug!(chain_id, %sender, tx_hash = %tx.tx_hash(), "schedule tx command");
312312

313313
if self
314314
.tx_count_cache
@@ -339,6 +339,7 @@ impl<E: Execute, G: GasPricesTrait, C: GetTxCountTrait> ChainPool<E, G, C> {
339339
if let SenderPoolState::Processing(nonce) = sender_pool_state {
340340
if nonce == tx.nonce {
341341
tracing::debug!(
342+
%sender,
342343
tx_hash = %tx.tx_hash(),
343344
nonce,
344345
"tx with the same nonce is already processing"
@@ -349,6 +350,7 @@ impl<E: Execute, G: GasPricesTrait, C: GetTxCountTrait> ChainPool<E, G, C> {
349350

350351
if tx_count > tx.nonce {
351352
tracing::debug!(
353+
%sender,
352354
tx_hash = %tx.tx_hash(),
353355
tx_nonce = tx.nonce,
354356
tx_count,
@@ -408,7 +410,7 @@ impl<E: Execute, G: GasPricesTrait, C: GetTxCountTrait> ChainPool<E, G, C> {
408410

409411
self.purge_over_capacity_txs();
410412

411-
tracing::debug!(%tx_hash, "adding tx to pool");
413+
tracing::debug!(chain_id, %sender, %tx_hash, "adding tx to pool");
412414
let record = QueueRecord {
413415
sender,
414416
tx_hash,
@@ -418,7 +420,7 @@ impl<E: Execute, G: GasPricesTrait, C: GetTxCountTrait> ChainPool<E, G, C> {
418420
self.txs.insert(tx_hash, tx);
419421
self.add_record(record).await;
420422
self.queue_new_tx(&sender, !created).await?;
421-
tracing::debug!(%tx_hash, "tx added to pool");
423+
tracing::debug!(chain_id, %sender, %tx_hash, "tx added to pool");
422424
Ok(())
423425
}
424426

@@ -455,17 +457,21 @@ impl<E: Execute, G: GasPricesTrait, C: GetTxCountTrait> ChainPool<E, G, C> {
455457
if update_tx_count {
456458
let queues_update = sender_pool.update_tx_count(&self.tx_count_api).await?;
457459
self.apply_queues_update(queues_update);
460+
} else {
461+
let queues_update = sender_pool.rebalance();
462+
self.apply_queues_update(queues_update);
458463
}
459464

460465
// reborrow sender pool to make borrow checker happy
461466
let Some(sender_pool) = self.sender_pools.get_mut(sender) else {
462467
return Err(MempoolError::UnknownSender(*sender));
463468
};
464469

470+
sender_pool.log_self("get_for_queueing");
465471
if let Some(tx) = sender_pool.get_for_queueing() {
466472
let gas_price = tx.sorting_gas_price;
473+
tracing::debug!(%sender, tx_count = %sender_pool.tx_count, record = ?tx, "tx queued");
467474
self.tx_price_queue.push(tx, gas_price);
468-
tracing::debug!(%sender, tx_count = %sender_pool.tx_count, "tx queued");
469475
} else if !was_suspended && sender_pool.is_suspended() {
470476
self.heartbeat_queue.insert(
471477
HeartBeatTask {
@@ -755,7 +761,7 @@ impl<E: Execute, G: GasPricesTrait, C: GetTxCountTrait> ChainPool<E, G, C> {
755761
let mut sender_pool = SenderPool::new(self.chain_id, *sender);
756762
// it's an empty sender pool, we don't care about queues update
757763
if let Err(err) = sender_pool.update_tx_count(&self.tx_count_api).await {
758-
tracing::error!(?err, "failed to update tx count");
764+
tracing::error!(%sender, ?err, "failed to update tx count");
759765
}
760766
self.sender_pools.insert(*sender, sender_pool);
761767
created = true;
@@ -775,18 +781,30 @@ where
775781

776782
#[cfg(test)]
777783
mod tests {
778-
use super::*;
784+
use std::io::IsTerminal;
785+
use std::sync::Mutex;
786+
use std::sync::Once;
779787

780788
use alloy_consensus::{SignableTransaction, TxLegacy};
781789
use alloy_network::TxSignerSync;
782790
use alloy_signer_wallet::LocalWallet;
783791
use reth_primitives::{TxKind, U256};
792+
use tokio::sync::mpsc::channel;
793+
use tokio::time::sleep;
794+
use tracing_subscriber::filter::EnvFilter;
784795

785796
use common::solana_sdk::signature::Keypair;
786797
use common::solana_sdk::signature::Signature;
798+
use common::types::TxEnvelopeExt;
787799
use executor::{ExecuteRequest, ExecuteResult};
788800

789-
struct MockExecutor;
801+
use super::*;
802+
803+
#[derive(Default)]
804+
struct MockExecutor {
805+
txs: Arc<Mutex<Vec<ExecuteRequest>>>,
806+
nonces: Arc<DashMap<Address, TxNonce>>,
807+
}
790808

791809
impl Execute for MockExecutor {
792810
async fn handle_transaction(
@@ -795,6 +813,13 @@ mod tests {
795813
result_sender: Option<oneshot::Sender<ExecuteResult>>,
796814
) -> anyhow::Result<Signature> {
797815
tracing::info!(?tx_request, "mock executor: handling tx");
816+
if let Some(old) = self.nonces.insert(
817+
tx_request.recover_signer().unwrap(),
818+
tx_request.tx().nonce().unwrap() + 1,
819+
) {
820+
assert!(old <= tx_request.tx().nonce().unwrap());
821+
}
822+
self.txs.lock().unwrap().push(tx_request);
798823
if let Some(sender) = result_sender {
799824
let _ = sender.send(ExecuteResult::Success);
800825
}
@@ -816,32 +841,44 @@ mod tests {
816841
}
817842

818843
#[derive(Clone)]
819-
struct MockGetTxCount;
844+
struct MockGetTxCount(Arc<DashMap<Address, TxNonce>>);
820845

821846
impl GetTxCountTrait for MockGetTxCount {
822847
async fn get_transaction_count(
823848
&self,
824-
_addr: BalanceAddress,
849+
addr: BalanceAddress,
825850
_tag: Option<BlockNumberOrTag>,
826851
) -> Result<u64, NeonApiError> {
827-
Ok(0)
852+
Ok(self
853+
.0
854+
.get(&addr.address.0)
855+
.map(|ref_| *ref_.value())
856+
.unwrap_or(0))
828857
}
829858
}
830859

831860
fn create_chain_pool() -> ChainPool<MockExecutor, MockGasPrices, MockGetTxCount> {
861+
static LOGS: Once = Once::new();
862+
LOGS.call_once(|| {
863+
tracing_subscriber::fmt::fmt()
864+
.with_env_filter(EnvFilter::builder().from_env_lossy())
865+
.with_ansi(std::io::stdout().is_terminal())
866+
.init()
867+
});
832868
let config = Config {
833869
chain_id: 1,
834-
capacity: 10,
870+
capacity: 1000,
835871
capacity_high_watermark: 0.8,
836872
eviction_timeout_sec: 60,
837873
tx_cache_size: 0,
838874
tx_count_cache_size: 0,
839875
};
876+
let executor = MockExecutor::default();
840877
ChainPool::new(
841878
config,
842879
MockGasPrices,
843-
MockGetTxCount,
844-
Arc::new(MockExecutor),
880+
MockGetTxCount(executor.nonces.clone()),
881+
executor.into(),
845882
Arc::new(DashMap::new()),
846883
)
847884
}
@@ -863,14 +900,29 @@ mod tests {
863900
ExecuteRequest::new(tx.into(), 1)
864901
}
865902

903+
fn create_req_with_addr(wallet: &LocalWallet, nonce: TxNonce) -> ExecuteRequest {
904+
let mut tx = TxLegacy {
905+
nonce,
906+
gas_price: 2,
907+
gas_limit: 2_000_000,
908+
to: TxKind::Create,
909+
value: U256::ZERO,
910+
input: Default::default(),
911+
chain_id: Some(1),
912+
};
913+
let signature = wallet.sign_transaction_sync(&mut tx).unwrap();
914+
let tx = tx.into_signed(signature);
915+
ExecuteRequest::new(tx.into(), 1)
916+
}
917+
866918
fn create_tx_record(
867919
gas_price: Option<GasPrice>,
868920
sorting_gas_price: GasPrice,
869921
nonce: TxNonce,
870922
sender: Address,
871923
) -> TxRecord {
872924
TxRecord {
873-
tx_request: create_exec_req(0),
925+
tx_request: create_exec_req(nonce),
874926
tx_chain_id: Some(1),
875927
sender,
876928
nonce,
@@ -981,4 +1033,36 @@ mod tests {
9811033
let result = chain_pool.add_tx(tx1.clone()).await;
9821034
assert!(matches!(result, Err(MempoolError::NonceTooHigh)));
9831035
}
1036+
1037+
#[tokio::test]
1038+
async fn test_random_sequence_from_basic() {
1039+
let sender = LocalWallet::random();
1040+
let mut chain_pool = create_chain_pool();
1041+
let chain = [
1042+
22, 2, 5, 24, 19, 10, 6, 17, 4, 16, 11, 14, 12, 15, 0, 18, 1, 21, 8, 23, 20, 9, 3, 13,
1043+
7,
1044+
];
1045+
let chain_len = chain.len();
1046+
for nonce in chain {
1047+
let tx = create_req_with_addr(&sender, nonce);
1048+
chain_pool.add_tx(tx.try_into().unwrap()).await.unwrap();
1049+
}
1050+
1051+
let txs = chain_pool.executor.txs.clone();
1052+
let (_tx, rx) = channel(1);
1053+
tokio::spawn(chain_pool.start(rx));
1054+
sleep(Duration::from_millis(
1055+
EXEC_INTERVAL_MS * (chain_len as u64 + 5),
1056+
))
1057+
.await;
1058+
1059+
let done = txs
1060+
.lock()
1061+
.unwrap()
1062+
.iter()
1063+
.map(|tx| tx.tx().nonce().unwrap())
1064+
.collect::<Vec<_>>();
1065+
let expected = (0..25).collect::<Vec<_>>();
1066+
assert_eq!(done, expected);
1067+
}
9841068
}

mempool/src/pools/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ pub enum QueueUpdateMove {
3737
}
3838

3939
#[derive(Debug, Default, Clone, Eq, PartialEq)]
40+
#[must_use = "Updates must be applied"]
4041
pub struct QueuesUpdate {
4142
pub add_update: Option<QueueUpdateAdd>,
4243
pub move_update: Option<QueueUpdateMove>,

0 commit comments

Comments
 (0)