diff --git a/crates/node/src/geth.rs b/crates/node/src/geth.rs index 066b3769..99893a05 100644 --- a/crates/node/src/geth.rs +++ b/crates/node/src/geth.rs @@ -2,7 +2,7 @@ use std::{ collections::HashMap, - fs::{File, create_dir_all, remove_dir_all}, + fs::{File, OpenOptions, create_dir_all, remove_dir_all}, io::{BufRead, BufReader, Read, Write}, path::PathBuf, process::{Child, Command, Stdio}, @@ -10,7 +10,6 @@ use std::{ Mutex, atomic::{AtomicU32, Ordering}, }, - thread, time::{Duration, Instant}, }; @@ -28,6 +27,7 @@ use revive_dt_node_interaction::{ EthereumNode, nonce::fetch_onchain_nonce, trace::trace_transaction, transaction::execute_transaction, }; +use tracing::Level; use crate::Node; @@ -45,6 +45,7 @@ pub struct Instance { connection_string: String, base_directory: PathBuf, data_directory: PathBuf, + logs_directory: PathBuf, geth: PathBuf, id: u32, handle: Option, @@ -52,11 +53,17 @@ pub struct Instance { start_timeout: u64, wallet: EthereumWallet, nonces: Mutex>, + /// This vector stores [`File`] objects that we use for logging which we want to flush when the + /// node object is dropped. We do not store them in a structured fashion at the moment (in + /// separate fields) as the logic that we need to apply to them is all the same regardless of + /// what it belongs to, we just want to flush them on [`Drop`] of the node. + logs_file_to_flush: Vec, } impl Instance { const BASE_DIRECTORY: &str = "geth"; const DATA_DIRECTORY: &str = "data"; + const LOGS_DIRECTORY: &str = "logs"; const IPC_FILE: &str = "geth.ipc"; const GENESIS_JSON_FILE: &str = "genesis.json"; @@ -64,9 +71,14 @@ impl Instance { const READY_MARKER: &str = "IPC endpoint opened"; const ERROR_MARKER: &str = "Fatal:"; + const GETH_STDOUT_LOG_FILE_NAME: &str = "node_stdout.log"; + const GETH_STDERR_LOG_FILE_NAME: &str = "node_stderr.log"; + /// Create the node directory and call `geth init` to configure the genesis. + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn init(&mut self, genesis: String) -> anyhow::Result<&mut Self> { create_dir_all(&self.base_directory)?; + create_dir_all(&self.logs_directory)?; let genesis_path = self.base_directory.join(Self::GENESIS_JSON_FILE); File::create(&genesis_path)?.write_all(genesis.as_bytes())?; @@ -96,8 +108,24 @@ impl Instance { /// Spawn the go-ethereum node child process. /// - /// [Instance::init] must be called priorly. + /// [Instance::init] must be called prior. + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn spawn_process(&mut self) -> anyhow::Result<&mut Self> { + // This is the `OpenOptions` that we wish to use for all of the log files that we will be + // opening in this method. We need to construct it in this way to: + // 1. Be consistent + // 2. Less verbose and more dry + // 3. Because the builder pattern uses mutable references so we need to get around that. + let open_options = { + let mut options = OpenOptions::new(); + options.create(true).truncate(true).write(true); + options + }; + + let stdout_logs_file = open_options + .clone() + .open(self.geth_stdout_log_file_path())?; + let stderr_logs_file = open_options.open(self.geth_stderr_log_file_path())?; self.handle = Command::new(&self.geth) .arg("--dev") .arg("--datadir") @@ -109,49 +137,67 @@ impl Instance { .arg("--nodiscover") .arg("--maxpeers") .arg("0") - .stderr(Stdio::piped()) - .stdout(Stdio::null()) + .stderr(stderr_logs_file.try_clone()?) + .stdout(stdout_logs_file.try_clone()?) .spawn()? .into(); + + if let Err(error) = self.wait_ready() { + tracing::error!(?error, "Failed to start geth, shutting down gracefully"); + self.shutdown()?; + return Err(error); + } + + self.logs_file_to_flush + .extend([stderr_logs_file, stdout_logs_file]); + Ok(self) } /// Wait for the g-ethereum node child process getting ready. /// /// [Instance::spawn_process] must be called priorly. + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn wait_ready(&mut self) -> anyhow::Result<&mut Self> { - // Thanks clippy but geth is a server; we don't `wait` but eventually kill it. - #[allow(clippy::zombie_processes)] - let mut child = self.handle.take().expect("should be spawned"); let start_time = Instant::now(); - let maximum_wait_time = Duration::from_millis(self.start_timeout); - let mut stderr = BufReader::new(child.stderr.take().expect("should be piped")).lines(); - let error = loop { - let Some(Ok(line)) = stderr.next() else { - break "child process stderr reading error".to_string(); - }; - if line.contains(Self::ERROR_MARKER) { - break line; - } - if line.contains(Self::READY_MARKER) { - // Keep stderr alive - // https://github.com/alloy-rs/alloy/issues/2091#issuecomment-2676134147 - thread::spawn(move || for _ in stderr.by_ref() {}); - self.handle = child.into(); - return Ok(self); + let logs_file = OpenOptions::new() + .read(true) + .write(false) + .append(false) + .truncate(false) + .open(self.geth_stderr_log_file_path())?; + + let maximum_wait_time = Duration::from_millis(self.start_timeout); + let mut stderr = BufReader::new(logs_file).lines(); + loop { + if let Some(Ok(line)) = stderr.next() { + if line.contains(Self::ERROR_MARKER) { + anyhow::bail!("Failed to start geth {line}"); + } + if line.contains(Self::READY_MARKER) { + return Ok(self); + } } if Instant::now().duration_since(start_time) > maximum_wait_time { - break "spawn timeout".to_string(); + anyhow::bail!("Timeout in starting geth"); } - }; + } + } - let _ = child.kill(); - anyhow::bail!("geth node #{} spawn error: {error}", self.id) + #[tracing::instrument(skip_all, fields(geth_node_id = self.id), level = Level::TRACE)] + fn geth_stdout_log_file_path(&self) -> PathBuf { + self.logs_directory.join(Self::GETH_STDOUT_LOG_FILE_NAME) + } + + #[tracing::instrument(skip_all, fields(geth_node_id = self.id), level = Level::TRACE)] + fn geth_stderr_log_file_path(&self) -> PathBuf { + self.logs_directory.join(Self::GETH_STDERR_LOG_FILE_NAME) } } impl EthereumNode for Instance { + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn execute_transaction( &self, transaction: TransactionRequest, @@ -241,6 +287,7 @@ impl EthereumNode for Instance { })) } + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn trace_transaction( &self, transaction: TransactionReceipt, @@ -263,6 +310,7 @@ impl EthereumNode for Instance { })) } + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn state_diff( &self, transaction: alloy::rpc::types::TransactionReceipt, @@ -276,6 +324,7 @@ impl EthereumNode for Instance { } } + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn fetch_add_nonce(&self, address: Address) -> anyhow::Result { let connection_string = self.connection_string.clone(); let wallet = self.wallet.clone(); @@ -299,6 +348,7 @@ impl Node for Instance { Self { connection_string: base_directory.join(Self::IPC_FILE).display().to_string(), data_directory: base_directory.join(Self::DATA_DIRECTORY), + logs_directory: base_directory.join(Self::LOGS_DIRECTORY), base_directory, geth: config.geth.clone(), id, @@ -307,22 +357,46 @@ impl Node for Instance { start_timeout: config.geth_start_timeout, wallet: config.wallet(), nonces: Mutex::new(HashMap::new()), + // We know that we only need to be storing 2 files so we can specify that when creating + // the vector. It's the stdout and stderr of the geth node. + logs_file_to_flush: Vec::with_capacity(2), } } + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn connection_string(&self) -> String { self.connection_string.clone() } - fn shutdown(self) -> anyhow::Result<()> { + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] + fn shutdown(&mut self) -> anyhow::Result<()> { + // Terminate the processes in a graceful manner to allow for the output to be flushed. + if let Some(mut child) = self.handle.take() { + child + .kill() + .map_err(|error| anyhow::anyhow!("Failed to kill the geth process: {error:?}"))?; + } + + // Flushing the files that we're using for keeping the logs before shutdown. + for file in self.logs_file_to_flush.iter_mut() { + file.flush()? + } + + // Remove the node's database so that subsequent runs do not run on the same database. We + // ignore the error just in case the directory didn't exist in the first place and therefore + // there's nothing to be deleted. + let _ = remove_dir_all(self.base_directory.join(Self::DATA_DIRECTORY)); + Ok(()) } + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn spawn(&mut self, genesis: String) -> anyhow::Result<()> { - self.init(genesis)?.spawn_process()?.wait_ready()?; + self.init(genesis)?.spawn_process()?; Ok(()) } + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn version(&self) -> anyhow::Result { let output = Command::new(&self.geth) .arg("--version") @@ -337,14 +411,9 @@ impl Node for Instance { } impl Drop for Instance { + #[tracing::instrument(skip_all, fields(geth_node_id = self.id))] fn drop(&mut self) { - tracing::info!(id = self.id, "Dropping node"); - if let Some(child) = self.handle.as_mut() { - let _ = child.kill(); - } - if self.base_directory.exists() { - let _ = remove_dir_all(&self.base_directory); - } + self.shutdown().expect("Failed to shutdown") } } diff --git a/crates/node/src/kitchensink.rs b/crates/node/src/kitchensink.rs index 239fe13c..a45337e4 100644 --- a/crates/node/src/kitchensink.rs +++ b/crates/node/src/kitchensink.rs @@ -1,8 +1,8 @@ use std::{ collections::HashMap, - fs::create_dir_all, - io::BufRead, - path::PathBuf, + fs::{File, OpenOptions, create_dir_all, remove_dir_all}, + io::{BufRead, Write}, + path::{Path, PathBuf}, process::{Child, Command, Stdio}, sync::{ Mutex, @@ -30,6 +30,7 @@ use serde::{Deserialize, Serialize}; use serde_json::{Value as JsonValue, json}; use sp_core::crypto::Ss58Codec; use sp_runtime::AccountId32; +use tracing::Level; use revive_dt_config::Arguments; use revive_dt_node_interaction::{ @@ -49,13 +50,22 @@ pub struct KitchensinkNode { rpc_url: String, wallet: EthereumWallet, base_directory: PathBuf, + logs_directory: PathBuf, process_substrate: Option, process_proxy: Option, nonces: Mutex>, + /// This vector stores [`File`] objects that we use for logging which we want to flush when the + /// node object is dropped. We do not store them in a structured fashion at the moment (in + /// separate fields) as the logic that we need to apply to them is all the same regardless of + /// what it belongs to, we just want to flush them on [`Drop`] of the node. + logs_file_to_flush: Vec, } impl KitchensinkNode { const BASE_DIRECTORY: &str = "kitchensink"; + const LOGS_DIRECTORY: &str = "logs"; + const DATA_DIRECTORY: &str = "chains"; + const SUBSTRATE_READY_MARKER: &str = "Running JSON-RPC server"; const ETH_PROXY_READY_MARKER: &str = "Running JSON-RPC server"; const CHAIN_SPEC_JSON_FILE: &str = "template_chainspec.json"; @@ -65,11 +75,21 @@ impl KitchensinkNode { const SUBSTRATE_LOG_ENV: &str = "error,evm=debug,sc_rpc_server=info,runtime::revive=debug"; const PROXY_LOG_ENV: &str = "info,eth-rpc=debug"; + const KITCHENSINK_STDOUT_LOG_FILE_NAME: &str = "node_stdout.log"; + const KITCHENSINK_STDERR_LOG_FILE_NAME: &str = "node_stderr.log"; + + const PROXY_STDOUT_LOG_FILE_NAME: &str = "proxy_stdout.log"; + const PROXY_STDERR_LOG_FILE_NAME: &str = "proxy_stderr.log"; + + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn init(&mut self, genesis: &str) -> anyhow::Result<&mut Self> { create_dir_all(&self.base_directory)?; + create_dir_all(&self.logs_directory)?; let template_chainspec_path = self.base_directory.join(Self::CHAIN_SPEC_JSON_FILE); + // Note: we do not pipe the logs of this process to a separate file since this is just a + // once-off export of the default chain spec and not part of the long-running node process. let output = Command::new(&self.substrate_binary) .arg("export-chain-spec") .arg("--chain") @@ -118,6 +138,7 @@ impl KitchensinkNode { Ok(self) } + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn spawn_process(&mut self) -> anyhow::Result<()> { let substrate_rpc_port = Self::BASE_SUBSTRATE_RPC_PORT + self.id as u16; let proxy_rpc_port = Self::BASE_PROXY_RPC_PORT + self.id as u16; @@ -126,8 +147,25 @@ impl KitchensinkNode { let chainspec_path = self.base_directory.join(Self::CHAIN_SPEC_JSON_FILE); + // This is the `OpenOptions` that we wish to use for all of the log files that we will be + // opening in this method. We need to construct it in this way to: + // 1. Be consistent + // 2. Less verbose and more dry + // 3. Because the builder pattern uses mutable references so we need to get around that. + let open_options = { + let mut options = OpenOptions::new(); + options.create(true).truncate(true).write(true); + options + }; + // Start Substrate node - let mut substrate_process = Command::new(&self.substrate_binary) + let kitchensink_stdout_logs_file = open_options + .clone() + .open(self.kitchensink_stdout_log_file_path())?; + let kitchensink_stderr_logs_file = open_options + .clone() + .open(self.kitchensink_stderr_log_file_path())?; + self.process_substrate = Command::new(&self.substrate_binary) .arg("--chain") .arg(chainspec_path) .arg("--base-path") @@ -142,40 +180,61 @@ impl KitchensinkNode { .arg("--rpc-cors") .arg("all") .env("RUST_LOG", Self::SUBSTRATE_LOG_ENV) - .stdout(Stdio::null()) - .stderr(Stdio::piped()) - .spawn()?; + .stdout(kitchensink_stdout_logs_file.try_clone()?) + .stderr(kitchensink_stderr_logs_file.try_clone()?) + .spawn()? + .into(); // Give the node a moment to boot - Self::wait_ready( - &mut substrate_process, + if let Err(error) = Self::wait_ready( + self.kitchensink_stderr_log_file_path().as_path(), Self::SUBSTRATE_READY_MARKER, Duration::from_secs(30), - )?; - - let mut proxy_process = Command::new(&self.eth_proxy_binary) + ) { + tracing::error!( + ?error, + "Failed to start substrate, shutting down gracefully" + ); + self.shutdown()?; + return Err(error); + }; + + let eth_proxy_stdout_logs_file = open_options + .clone() + .open(self.proxy_stdout_log_file_path())?; + let eth_proxy_stderr_logs_file = open_options.open(self.proxy_stderr_log_file_path())?; + self.process_proxy = Command::new(&self.eth_proxy_binary) .arg("--dev") .arg("--rpc-port") .arg(proxy_rpc_port.to_string()) .arg("--node-rpc-url") .arg(format!("ws://127.0.0.1:{substrate_rpc_port}")) .env("RUST_LOG", Self::PROXY_LOG_ENV) - .stdout(Stdio::null()) - .stderr(Stdio::piped()) - .spawn()?; + .stdout(eth_proxy_stdout_logs_file.try_clone()?) + .stderr(eth_proxy_stderr_logs_file.try_clone()?) + .spawn()? + .into(); - Self::wait_ready( - &mut proxy_process, + if let Err(error) = Self::wait_ready( + self.proxy_stderr_log_file_path().as_path(), Self::ETH_PROXY_READY_MARKER, Duration::from_secs(30), - )?; - - self.process_substrate = Some(substrate_process); - self.process_proxy = Some(proxy_process); + ) { + tracing::error!(?error, "Failed to start proxy, shutting down gracefully"); + self.shutdown()?; + return Err(error); + }; + + self.logs_file_to_flush.extend([ + kitchensink_stdout_logs_file, + kitchensink_stderr_logs_file, + eth_proxy_stdout_logs_file, + eth_proxy_stderr_logs_file, + ]); Ok(()) } - + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn extract_balance_from_genesis_file( &self, genesis_str: &str, @@ -216,27 +275,30 @@ impl KitchensinkNode { Ok(account_id.to_ss58check()) } - fn wait_ready(child: &mut Child, marker: &str, timeout: Duration) -> anyhow::Result<()> { + fn wait_ready(logs_file_path: &Path, marker: &str, timeout: Duration) -> anyhow::Result<()> { let start_time = std::time::Instant::now(); - let stderr = child.stderr.take().expect("stderr must be piped"); - - let mut lines = std::io::BufReader::new(stderr).lines(); + let logs_file = OpenOptions::new() + .read(true) + .write(false) + .append(false) + .truncate(false) + .open(logs_file_path)?; + + let mut lines = std::io::BufReader::new(logs_file).lines(); loop { if let Some(Ok(line)) = lines.next() { - println!("Kitchensink log: {line:?}"); if line.contains(marker) { - std::thread::spawn(move || for _ in lines.by_ref() {}); return Ok(()); } } if start_time.elapsed() > timeout { - let _ = child.kill(); anyhow::bail!("Timeout waiting for process readiness: {marker}"); } } } + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] pub fn eth_rpc_version(&self) -> anyhow::Result { let output = Command::new(&self.eth_proxy_binary) .arg("--version") @@ -248,9 +310,32 @@ impl KitchensinkNode { .stdout; Ok(String::from_utf8_lossy(&output).trim().to_string()) } + + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id), level = Level::TRACE)] + fn kitchensink_stdout_log_file_path(&self) -> PathBuf { + self.logs_directory + .join(Self::KITCHENSINK_STDOUT_LOG_FILE_NAME) + } + + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id), level = Level::TRACE)] + fn kitchensink_stderr_log_file_path(&self) -> PathBuf { + self.logs_directory + .join(Self::KITCHENSINK_STDERR_LOG_FILE_NAME) + } + + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id), level = Level::TRACE)] + fn proxy_stdout_log_file_path(&self) -> PathBuf { + self.logs_directory.join(Self::PROXY_STDOUT_LOG_FILE_NAME) + } + + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id), level = Level::TRACE)] + fn proxy_stderr_log_file_path(&self) -> PathBuf { + self.logs_directory.join(Self::PROXY_STDERR_LOG_FILE_NAME) + } } impl EthereumNode for KitchensinkNode { + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn execute_transaction( &self, transaction: alloy::rpc::types::TransactionRequest, @@ -276,6 +361,7 @@ impl EthereumNode for KitchensinkNode { receipt } + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn trace_transaction( &self, transaction: TransactionReceipt, @@ -300,6 +386,7 @@ impl EthereumNode for KitchensinkNode { })) } + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn state_diff(&self, transaction: TransactionReceipt) -> anyhow::Result { match self .trace_transaction(transaction)? @@ -310,6 +397,7 @@ impl EthereumNode for KitchensinkNode { } } + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn fetch_add_nonce(&self, address: Address) -> anyhow::Result { let url = self.rpc_url.clone(); let wallet = self.wallet.clone(); @@ -329,6 +417,7 @@ impl Node for KitchensinkNode { let kitchensink_directory = config.directory().join(Self::BASE_DIRECTORY); let id = NODE_COUNT.fetch_add(1, Ordering::SeqCst); let base_directory = kitchensink_directory.join(id.to_string()); + let logs_directory = base_directory.join(Self::LOGS_DIRECTORY); Self { id, @@ -337,30 +426,54 @@ impl Node for KitchensinkNode { rpc_url: String::new(), wallet: config.wallet(), base_directory, + logs_directory, process_substrate: None, process_proxy: None, nonces: Mutex::new(HashMap::new()), + // We know that we only need to be storing 4 files so we can specify that when creating + // the vector. It's the stdout and stderr of the substrate-node and the eth-rpc. + logs_file_to_flush: Vec::with_capacity(4), } } + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn connection_string(&self) -> String { self.rpc_url.clone() } - fn shutdown(mut self) -> anyhow::Result<()> { + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] + fn shutdown(&mut self) -> anyhow::Result<()> { + // Terminate the processes in a graceful manner to allow for the output to be flushed. if let Some(mut child) = self.process_proxy.take() { - let _ = child.kill(); + child + .kill() + .map_err(|error| anyhow::anyhow!("Failed to kill the proxy process: {error:?}"))?; } if let Some(mut child) = self.process_substrate.take() { - let _ = child.kill(); + child.kill().map_err(|error| { + anyhow::anyhow!("Failed to kill the substrate process: {error:?}") + })?; + } + + // Flushing the files that we're using for keeping the logs before shutdown. + for file in self.logs_file_to_flush.iter_mut() { + file.flush()? } + + // Remove the node's database so that subsequent runs do not run on the same database. We + // ignore the error just in case the directory didn't exist in the first place and therefore + // there's nothing to be deleted. + let _ = remove_dir_all(self.base_directory.join(Self::DATA_DIRECTORY)); + Ok(()) } + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn spawn(&mut self, genesis: String) -> anyhow::Result<()> { self.init(&genesis)?.spawn_process() } + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn version(&self) -> anyhow::Result { let output = Command::new(&self.substrate_binary) .arg("--version") @@ -375,13 +488,9 @@ impl Node for KitchensinkNode { } impl Drop for KitchensinkNode { + #[tracing::instrument(skip_all, fields(kitchensink_node_id = self.id))] fn drop(&mut self) { - if let Some(mut child) = self.process_proxy.take() { - let _ = child.kill(); - } - if let Some(mut child) = self.process_substrate.take() { - let _ = child.kill(); - } + self.shutdown().expect("Failed to shutdown") } } diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index a293c8ae..7552ae61 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -23,7 +23,7 @@ pub trait Node: EthereumNode { /// Prune the node instance and related data. /// /// Blocking until it's completely stopped. - fn shutdown(self) -> anyhow::Result<()>; + fn shutdown(&mut self) -> anyhow::Result<()>; /// Returns the nodes connection string. fn connection_string(&self) -> String;