@@ -29,7 +29,7 @@ use crate::{GasPricesTrait, MempoolError};
2929
3030const EXEC_RESULT_CHANNEL_SIZE : usize = 1024 ;
3131const ONE_BLOCK_MS : u64 = 400 ;
32- const EXEC_INTERVAL_MS : u64 = ONE_BLOCK_MS ;
32+ pub const EXEC_INTERVAL_MS : u64 = ONE_BLOCK_MS ;
3333
3434pub trait GetTxCountTrait : Clone + Send + Sync + ' static {
3535 fn get_transaction_count (
@@ -60,6 +60,7 @@ pub struct Config {
6060 pub eviction_timeout_sec : u64 ,
6161 pub tx_cache_size : usize ,
6262 pub tx_count_cache_size : usize ,
63+ pub exec_interval_ms : u64 ,
6364}
6465
6566struct HeartBeatTask {
@@ -99,6 +100,7 @@ pub struct ChainPool<E: Execute, G: GasPricesTrait, C: GetTxCountTrait> {
99100 eviction_timeout_sec : u64 ,
100101 tx_count_cache : Option < LruCache < Address , u64 > > ,
101102 tx_cache : Option < LruCache < EthTxHash , ( ) > > ,
103+ exec_interval_ms : u64 ,
102104}
103105
104106impl < E : Execute , G : GasPricesTrait , C : GetTxCountTrait > ChainPool < E , G , C > {
@@ -128,6 +130,7 @@ impl<E: Execute, G: GasPricesTrait, C: GetTxCountTrait> ChainPool<E, G, C> {
128130 eviction_timeout_sec : config. eviction_timeout_sec ,
129131 tx_count_cache : init_cache ( config. tx_count_cache_size ) ,
130132 tx_cache : init_cache ( config. tx_cache_size ) ,
133+ exec_interval_ms : config. exec_interval_ms ,
131134 }
132135 }
133136
@@ -147,7 +150,12 @@ impl<E: Execute, G: GasPricesTrait, C: GetTxCountTrait> ChainPool<E, G, C> {
147150 let mut cmd_rx = cmd_rx;
148151 let ( exec_result_tx, mut exec_result_rx) =
149152 mpsc:: channel :: < ExecutionResult > ( EXEC_RESULT_CHANNEL_SIZE ) ;
150- let mut exec_interval = tokio:: time:: interval ( Duration :: from_millis ( EXEC_INTERVAL_MS ) ) ;
153+ let interval = if self . exec_interval_ms == 0 {
154+ Duration :: from_micros ( 1 )
155+ } else {
156+ Duration :: from_millis ( self . exec_interval_ms )
157+ } ;
158+ let mut exec_interval = tokio:: time:: interval ( interval) ;
151159 loop {
152160 tokio:: select! {
153161 _ = exec_interval. tick( ) => {
@@ -779,6 +787,7 @@ where
779787 Some ( LruCache :: new ( size) )
780788}
781789
790+ // TODO: move out
782791#[ cfg( test) ]
783792mod tests {
784793 use std:: io:: IsTerminal ;
@@ -788,7 +797,9 @@ mod tests {
788797 use alloy_consensus:: { SignableTransaction , TxLegacy } ;
789798 use alloy_network:: TxSignerSync ;
790799 use alloy_signer_wallet:: LocalWallet ;
800+ use proptest:: prelude:: * ;
791801 use reth_primitives:: { TxKind , U256 } ;
802+ use tokio:: runtime:: Runtime ;
792803 use tokio:: sync:: mpsc:: channel;
793804 use tokio:: time:: sleep;
794805 use tracing_subscriber:: filter:: EnvFilter ;
@@ -872,6 +883,7 @@ mod tests {
872883 eviction_timeout_sec : 60 ,
873884 tx_cache_size : 0 ,
874885 tx_count_cache_size : 0 ,
886+ exec_interval_ms : 0 ,
875887 } ;
876888 let executor = MockExecutor :: default ( ) ;
877889 ChainPool :: new (
@@ -1034,35 +1046,56 @@ mod tests {
10341046 assert ! ( matches!( result, Err ( MempoolError :: NonceTooHigh ) ) ) ;
10351047 }
10361048
1037- #[ tokio:: test]
1038- async fn test_random_sequence_from_basic ( ) {
1049+ async fn test_sequence ( nonces : Vec < u64 > ) {
10391050 let sender = LocalWallet :: random ( ) ;
10401051 let mut chain_pool = create_chain_pool ( ) ;
1041- let chain = [
1042- 22 , 2 , 5 , 24 , 19 , 10 , 6 , 17 , 4 , 16 , 11 , 14 , 12 , 15 , 0 , 18 , 1 , 21 , 8 , 23 , 20 , 9 , 3 , 13 ,
1043- 7 ,
1044- ] ;
1045- let chain_len = chain. len ( ) ;
1046- for nonce in chain {
1052+ let len = nonces. len ( ) ;
1053+ for nonce in nonces {
10471054 let tx = create_req_with_addr ( & sender, nonce) ;
10481055 chain_pool. add_tx ( tx. try_into ( ) . unwrap ( ) ) . await . unwrap ( ) ;
10491056 }
10501057
10511058 let txs = chain_pool. executor . txs . clone ( ) ;
10521059 let ( _tx, rx) = channel ( 1 ) ;
10531060 tokio:: spawn ( chain_pool. start ( rx) ) ;
1054- sleep ( Duration :: from_millis (
1055- EXEC_INTERVAL_MS * ( chain_len as u64 + 5 ) ,
1056- ) )
1057- . await ;
1061+ sleep ( Duration :: from_millis ( len as u64 * 2 ) ) . await ;
10581062
10591063 let done = txs
10601064 . lock ( )
10611065 . unwrap ( )
10621066 . iter ( )
10631067 . map ( |tx| tx. tx ( ) . nonce ( ) . unwrap ( ) )
10641068 . collect :: < Vec < _ > > ( ) ;
1065- let expected = ( 0 ..25 ) . collect :: < Vec < _ > > ( ) ;
1069+ let expected = ( 0 ..len as u64 ) . collect :: < Vec < _ > > ( ) ;
10661070 assert_eq ! ( done, expected) ;
10671071 }
1072+
1073+ #[ tokio:: test]
1074+ async fn test_random_sequence_from_basic ( ) {
1075+ let chain = [
1076+ 22 , 2 , 5 , 24 , 19 , 10 , 6 , 17 , 4 , 16 , 11 , 14 , 12 , 15 , 0 , 18 , 1 , 21 , 8 , 23 , 20 , 9 , 3 , 13 ,
1077+ 7 ,
1078+ ] ;
1079+ test_sequence ( chain. into ( ) ) . await
1080+ }
1081+
1082+ #[ tokio:: test]
1083+ async fn test_simple_sequence ( ) {
1084+ let chain = [ 0 , 1 ] ;
1085+ test_sequence ( chain. into ( ) ) . await
1086+ }
1087+
1088+ proptest ! {
1089+ /// Tests that all transactions from an arbitrary shuffled continuous valid sequence
1090+ /// get eventually queued.
1091+ #[ test]
1092+ #[ ignore] // Won't work with timers in chain pool
1093+ fn test_continuous_sequence(
1094+ nonces in ( 1 ..300u64 ) // Sequence of length from 1 to 300
1095+ . prop_map( |len| ( 0 ..len) . collect:: <Vec <_>>( ) ) // of all numbers from 0 to len
1096+ . prop_shuffle( ) // arbitrarily shuffled
1097+ ) {
1098+ Runtime :: new( ) . unwrap( ) . block_on( test_sequence( nonces) ) ;
1099+ }
1100+ }
10681101}
0 commit comments