diff --git a/utils/src/lib.rs b/utils/src/lib.rs index 6799488b..e1c19ddb 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -130,6 +130,9 @@ pub struct SpamCmd { /// Rate of transactions per second #[clap(short, long, default_value = "1000")] rate: u64, + /// Interval in ms for sending batches of transactions + #[clap(short, long, default_value = "200")] + interval: u64, /// Time to run the spammer for in seconds #[clap(short, long, default_value = "0")] time: u64, @@ -150,6 +153,7 @@ impl SpamCmd { rpc_url, num_txs, rate, + interval, time, blobs, signer_index, @@ -157,17 +161,15 @@ impl SpamCmd { } = self; let url: Url = rpc_url.parse()?; - Spammer::new( - url, - *signer_index, - *num_txs, - *time, - *rate, - *blobs, - *chain_id, - )? - .run() - .await + let config = spammer::SpammerConfig { + max_num_txs: *num_txs, + max_time: *time, + max_rate: *rate, + batch_interval: *interval, + blobs: *blobs, + chain_id: *chain_id, + }; + Spammer::new(url, *signer_index, config)?.run().await } } @@ -296,6 +298,9 @@ pub struct SpamContractCmd { /// Rate of transactions per second #[clap(short, long, default_value_t = 1000)] rate: u64, + /// Interval in ms for sending batches of transactions + #[clap(short, long, default_value = "200")] + interval: u64, /// Time to run the spammer for in seconds #[clap(short, long, default_value_t = 0)] time: u64, @@ -315,23 +320,22 @@ impl SpamContractCmd { rpc_url, num_txs, rate, + interval, time, signer_index, chain_id, } = self; let url = format!("http://{rpc_url}").parse()?; - Spammer::new_contract( - url, - *signer_index, - *num_txs, - *time, - *rate, - contract, - function, - args, - *chain_id, - )? - .run() - .await + let config = spammer::SpammerConfig { + max_num_txs: *num_txs, + max_time: *time, + max_rate: *rate, + batch_interval: *interval, + blobs: false, + chain_id: *chain_id, + }; + Spammer::new_contract(url, *signer_index, config, contract, function, args)? + .run() + .await } } diff --git a/utils/src/spammer.rs b/utils/src/spammer.rs index 24758389..1b0cd0b0 100644 --- a/utils/src/spammer.rs +++ b/utils/src/spammer.rs @@ -20,6 +20,9 @@ use tracing::debug; use crate::make_signers; use crate::tx::{make_signed_contract_call_tx, make_signed_eip1559_tx, make_signed_eip4844_tx}; +/// Target pool size to maintain (in number of transactions). +const TARGET_POOL_SIZE: u64 = 30_000; + struct ContractPayload { /// Contract address for contract call spamming. address: Address, @@ -29,6 +32,22 @@ struct ContractPayload { args: Vec, } +/// Configuration for the transaction spammer. +pub struct SpammerConfig { + /// Maximum number of transactions to send (0 for no limit). + pub max_num_txs: u64, + /// Maximum number of seconds to run the spammer (0 for no limit). + pub max_time: u64, + /// Maximum number of transactions to send per second. + pub max_rate: u64, + /// Number of ms between sending batches of txs. + pub batch_interval: u64, + /// Whether to send EIP-4844 blob transactions. + pub blobs: bool, + /// Chain ID for the transactions. + pub chain_id: u64, +} + /// A transaction spammer that sends Ethereum transactions at a controlled rate. /// Tracks and reports statistics on sent transactions. pub struct Spammer { @@ -44,6 +63,8 @@ pub struct Spammer { max_time: u64, /// Maximum number of transactions to send per second. max_rate: u64, + /// Number of ms between sending batches of txs (default: 200). + batch_interval: u64, /// Whether to send EIP-4844 blob transactions. blobs: bool, /// Chain ID for the transactions. @@ -53,40 +74,29 @@ pub struct Spammer { } impl Spammer { - pub fn new( - url: Url, - signer_index: usize, - max_num_txs: u64, - max_time: u64, - max_rate: u64, - blobs: bool, - chain_id: u64, - ) -> Result { + pub fn new(url: Url, signer_index: usize, config: SpammerConfig) -> Result { let signers = make_signers(); Ok(Self { id: signer_index.to_string(), client: RpcClient::new(url)?, signer: signers[signer_index].clone(), - max_num_txs, - max_time, - max_rate, - blobs, - chain_id, + max_num_txs: config.max_num_txs, + max_time: config.max_time, + max_rate: config.max_rate, + batch_interval: config.batch_interval, + blobs: config.blobs, + chain_id: config.chain_id, contract_payload: None, }) } - #[allow(clippy::too_many_arguments)] pub fn new_contract( url: Url, signer_index: usize, - max_num_txs: u64, - max_time: u64, - max_rate: u64, + config: SpammerConfig, contract: &Address, function: &str, args: &[String], - chain_id: u64, ) -> Result { let signers = make_signers(); let contract_payload = ContractPayload { @@ -98,12 +108,13 @@ impl Spammer { id: signer_index.to_string(), client: RpcClient::new(url)?, signer: signers[signer_index].clone(), - max_num_txs, - max_time, - max_rate, + max_num_txs: config.max_num_txs, + max_time: config.max_time, + max_rate: config.max_rate, + batch_interval: config.batch_interval, blobs: false, // Contract calls don't use blobs contract_payload: Some(contract_payload), - chain_id, + chain_id: config.chain_id, }) } @@ -155,6 +166,17 @@ impl Spammer { Ok(u64::from_str_radix(hex_str, 16)?) } + // Get current txpool status. + async fn get_txpool_status(&self) -> Result { + self.client.rpc_request("txpool_status", vec![]).await + } + + // Get current number of pending and queued transactions in the pool. + async fn get_mempool_count(&self) -> Result { + let status = self.get_txpool_status().await?; + Ok(status.pending + status.queued) + } + /// Generate and send transactions to the Ethereum node at a controlled rate. async fn spammer( &self, @@ -165,83 +187,112 @@ impl Spammer { // Fetch latest nonce for the sender address. let address = self.signer.address(); let latest_nonce = self.get_latest_nonce(address).await?; - debug!("Spamming {address} starting from nonce={latest_nonce}"); + let txs_per_batch = self + .max_rate + .saturating_mul(self.batch_interval) + .checked_div(1000) + .unwrap_or(0); + debug!( + "Spamming {address} starting from nonce={latest_nonce} at rate {}, sending {txs_per_batch} txs every {}ms", + self.max_rate, + self.batch_interval, + ); // Initialize nonce and counters. let mut nonce = latest_nonce; let start_time = Instant::now(); let mut txs_sent_total = 0u64; - let mut interval = time::interval(Duration::from_secs(1)); + let mut interval = time::interval(Duration::from_millis(self.batch_interval)); + loop { // Wait for next one-second tick. let _ = interval.tick().await; let interval_start = Instant::now(); - // Prepare batch of transactions for this interval. - let mut batch_entries = Vec::with_capacity(self.max_rate as usize); + // Verify the nonce for gaps + // TODO: probably this should run as a separate task + let on_chain_nonce = self.get_latest_nonce(address).await?; + // If the span between the on-chain nonce and the one we are about to send + // is too big, then probably there is a gap that doesn't allow the + // on-chain nonce too advance. + let nonce_span = nonce.saturating_sub(on_chain_nonce); + if nonce_span > self.max_rate { + debug!("Current nonce={nonce}, on-chain nonce={on_chain_nonce}. Sending 10 txs"); + let batch_entries = self.build_batch_entries(10, on_chain_nonce).await?; + if let Some(results) = self.send_raw_batch(&batch_entries).await? { + if results.len() != batch_entries.len() { + return Err(eyre::eyre!( + "Batch response count {} does not match request count {}", + results.len(), + batch_entries.len() + )); + } - for _ in 0..self.max_rate { - // Check exit conditions before creating each transaction. - if (self.max_num_txs > 0 && txs_sent_total >= self.max_num_txs) - || (self.max_time > 0 && start_time.elapsed().as_secs() >= self.max_time) - { - break; + // Report individual results. + for ((_, tx_bytes_len), result) in batch_entries.into_iter().zip(results) { + let mapped_result = result.map(|_| tx_bytes_len); + result_sender.send(mapped_result).await?; + } + } else { + debug!("Batch eth_sendRawTransaction timed out; skipping this tick"); + report_sender.send(interval_start).await?; + continue; } + } - // Create one transaction and sign it. - let signed_tx = if let Some(ref payload) = self.contract_payload { - // Contract call transaction - make_signed_contract_call_tx( - &self.signer, - nonce, - payload.address, - &payload.function_sig, - payload.args.as_slice(), - self.chain_id, - ) - .await? - } else if self.blobs { - // Blob transaction - make_signed_eip4844_tx(&self.signer, nonce, self.chain_id).await? - } else { - // Regular transfer - make_signed_eip1559_tx(&self.signer, nonce, self.chain_id).await? - }; - let tx_bytes = signed_tx.encoded_2718(); - let tx_bytes_len = tx_bytes.len() as u64; - - // Add to batch. - let payload = hex::encode(tx_bytes); - batch_entries.push((vec![json!(payload)], tx_bytes_len)); - - nonce += 1; - txs_sent_total += 1; + // Get current pool size and calculate dynamic send rate + let current_pool_size = self.get_mempool_count().await.unwrap_or(0); + let space_available = TARGET_POOL_SIZE.saturating_sub(current_pool_size); + let txs_to_send = if space_available < txs_per_batch { + space_available + } else { + txs_per_batch + }; + + // Continue if there is no space available + if txs_to_send == 0 { + debug!("Mempool already full. Do not send more transactions."); + let _ = report_sender.send(interval_start).await; + continue; } + // Limit the max number of transactions + let tx_count = if self.max_num_txs > 0 { + txs_to_send.min(self.max_num_txs.saturating_sub(txs_sent_total)) + } else { + txs_to_send + }; + + // Prepare batch of transactions for this interval. + let batch_entries = self.build_batch_entries(tx_count, nonce).await?; + let batch_size = batch_entries.len() as u64; + + debug!( + "Pool: {current_pool_size}/{TARGET_POOL_SIZE}, sending {batch_size} txs from nonce {nonce} (rate: {})", + self.max_rate + ); + // Send all transactions in a single batch RPC call. if !batch_entries.is_empty() { - let params: Vec<_> = batch_entries - .iter() - .map(|(params, _)| params.clone()) - .collect(); - - let results = self - .client - .rpc_batch_request("eth_sendRawTransaction", params) - .await?; - - if results.len() != batch_entries.len() { - return Err(eyre::eyre!( - "Batch response count {} does not match request count {}", - results.len(), - batch_entries.len() - )); - } + if let Some(results) = self.send_raw_batch(&batch_entries).await? { + if results.len() != batch_entries.len() { + return Err(eyre::eyre!( + "Batch response count {} does not match request count {}", + results.len(), + batch_entries.len() + )); + } + + // Report individual results. + for ((_, tx_bytes_len), result) in batch_entries.into_iter().zip(results) { + let mapped_result = result.map(|_| tx_bytes_len); + result_sender.send(mapped_result).await?; + } - // Report individual results. - for ((_, tx_bytes_len), result) in batch_entries.into_iter().zip(results) { - let mapped_result = result.map(|_| tx_bytes_len); - result_sender.send(mapped_result).await?; + txs_sent_total += batch_size; + nonce += batch_size; + } else { + debug!("Batch eth_sendRawTransaction timed out; skipping this tick"); } } @@ -249,7 +300,7 @@ impl Spammer { sleep(Duration::from_millis(20)).await; // Signal tracker to report stats after this batch. - report_sender.try_send(interval_start)?; + let _ = report_sender.send(interval_start).await; // Check exit conditions after each tick. if (self.max_num_txs > 0 && txs_sent_total >= self.max_num_txs) @@ -262,6 +313,68 @@ impl Spammer { Ok(()) } + async fn build_batch_entries( + &self, + tx_count: u64, + nonce: u64, + ) -> Result, u64)>> { + let mut batch_entries = Vec::with_capacity(tx_count as usize); + let mut next_nonce = nonce; + + for _ in 0..tx_count { + let signed_tx = if let Some(ref payload) = self.contract_payload { + make_signed_contract_call_tx( + &self.signer, + next_nonce, + payload.address, + &payload.function_sig, + payload.args.as_slice(), + self.chain_id, + ) + .await? + } else if self.blobs { + make_signed_eip4844_tx(&self.signer, next_nonce, self.chain_id).await? + } else { + make_signed_eip1559_tx(&self.signer, next_nonce, self.chain_id).await? + }; + + let tx_bytes = signed_tx.encoded_2718(); + let tx_bytes_len = tx_bytes.len() as u64; + let payload = hex::encode(tx_bytes); + batch_entries.push((vec![json!(payload)], tx_bytes_len)); + next_nonce += 1; + } + + Ok(batch_entries) + } + + async fn send_raw_batch( + &self, + batch_entries: &[(Vec, u64)], + ) -> Result>>> { + let params: Vec<_> = batch_entries + .iter() + .map(|(params, _)| params.clone()) + .collect(); + + match self + .client + .rpc_batch_request("eth_sendRawTransaction", params) + .await + { + Ok(responses) => Ok(Some(responses)), + Err(err) => { + if let Some(jsonrpsee_core::client::Error::RequestTimeout) = + err.downcast_ref::() + { + Ok(None) + } else { + Err(err) + } + } + } + } + // Track and report statistics on sent transactions. async fn tracker( &self, @@ -290,7 +403,7 @@ impl Spammer { sleep(Duration::from_secs(1) - elapsed).await; } - let pool_status: TxpoolStatus = self.client.rpc_request("txpool_status", vec![]).await?; + let pool_status = self.get_txpool_status().await?; debug!("{stats_last_second}; {pool_status:?}"); // Update total, then reset last second stats @@ -378,6 +491,7 @@ impl fmt::Display for Stats { } } +#[derive(Clone)] struct RpcClient { client: HttpClient, } @@ -385,7 +499,7 @@ struct RpcClient { impl RpcClient { pub fn new(url: Url) -> Result { let client = HttpClientBuilder::default() - .request_timeout(Duration::from_secs(5)) + .request_timeout(Duration::from_secs(1)) .build(url)?; Ok(Self { client }) }