Skip to content
141 changes: 105 additions & 36 deletions crates/node/src/geth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@

use std::{
collections::HashMap,
fs::{File, create_dir_all, remove_dir_all},
fs::{File, OpenOptions, create_dir_all, remove_dir_all},
io::{BufRead, BufReader, Read, Write},
path::PathBuf,
process::{Child, Command, Stdio},
sync::{
Mutex,
atomic::{AtomicU32, Ordering},
},
thread,
time::{Duration, Instant},
};

Expand All @@ -28,6 +27,7 @@ use revive_dt_node_interaction::{
EthereumNode, nonce::fetch_onchain_nonce, trace::trace_transaction,
transaction::execute_transaction,
};
use tracing::Level;

use crate::Node;

Expand All @@ -45,28 +45,40 @@ pub struct Instance {
connection_string: String,
base_directory: PathBuf,
data_directory: PathBuf,
logs_directory: PathBuf,
geth: PathBuf,
id: u32,
handle: Option<Child>,
network_id: u64,
start_timeout: u64,
wallet: EthereumWallet,
nonces: Mutex<HashMap<Address, u64>>,
/// This vector stores [`File`] objects that we use for logging which we want to flush when the
/// node object is dropped. We do not store them in a structured fashion at the moment (in
/// separate fields) as the logic that we need to apply to them is all the same regardless of
/// what it belongs to, we just want to flush them on [`Drop`] of the node.
logs_file_to_flush: Vec<File>,
}

impl Instance {
const BASE_DIRECTORY: &str = "geth";
const DATA_DIRECTORY: &str = "data";
const LOGS_DIRECTORY: &str = "logs";

const IPC_FILE: &str = "geth.ipc";
const GENESIS_JSON_FILE: &str = "genesis.json";

const READY_MARKER: &str = "IPC endpoint opened";
const ERROR_MARKER: &str = "Fatal:";

const GETH_STDOUT_LOG_FILE_NAME: &str = "node_stdout.log";
const GETH_STDERR_LOG_FILE_NAME: &str = "node_stderr.log";

/// Create the node directory and call `geth init` to configure the genesis.
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn init(&mut self, genesis: String) -> anyhow::Result<&mut Self> {
create_dir_all(&self.base_directory)?;
create_dir_all(&self.logs_directory)?;

let genesis_path = self.base_directory.join(Self::GENESIS_JSON_FILE);
File::create(&genesis_path)?.write_all(genesis.as_bytes())?;
Expand Down Expand Up @@ -96,8 +108,24 @@ impl Instance {

/// Spawn the go-ethereum node child process.
///
/// [Instance::init] must be called priorly.
/// [Instance::init] must be called prior.
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn spawn_process(&mut self) -> anyhow::Result<&mut Self> {
// This is the `OpenOptions` that we wish to use for all of the log files that we will be
// opening in this method. We need to construct it in this way to:
// 1. Be consistent
// 2. Less verbose and more dry
// 3. Because the builder pattern uses mutable references so we need to get around that.
let open_options = {
let mut options = OpenOptions::new();
options.create(true).truncate(true).write(true);
options
};

let stdout_logs_file = open_options
.clone()
.open(self.geth_stdout_log_file_path())?;
let stderr_logs_file = open_options.open(self.geth_stderr_log_file_path())?;
self.handle = Command::new(&self.geth)
.arg("--dev")
.arg("--datadir")
Expand All @@ -109,49 +137,67 @@ impl Instance {
.arg("--nodiscover")
.arg("--maxpeers")
.arg("0")
.stderr(Stdio::piped())
.stdout(Stdio::null())
.stderr(stderr_logs_file.try_clone()?)
.stdout(stdout_logs_file.try_clone()?)
.spawn()?
.into();

if let Err(error) = self.wait_ready() {
tracing::error!(?error, "Failed to start geth, shutting down gracefully");
self.shutdown()?;
return Err(error);
}

self.logs_file_to_flush
.extend([stderr_logs_file, stdout_logs_file]);

Ok(self)
}

/// Wait for the g-ethereum node child process getting ready.
///
/// [Instance::spawn_process] must be called priorly.
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn wait_ready(&mut self) -> anyhow::Result<&mut Self> {
// Thanks clippy but geth is a server; we don't `wait` but eventually kill it.
#[allow(clippy::zombie_processes)]
let mut child = self.handle.take().expect("should be spawned");
let start_time = Instant::now();
let maximum_wait_time = Duration::from_millis(self.start_timeout);
let mut stderr = BufReader::new(child.stderr.take().expect("should be piped")).lines();
let error = loop {
let Some(Ok(line)) = stderr.next() else {
break "child process stderr reading error".to_string();
};
if line.contains(Self::ERROR_MARKER) {
break line;
}
if line.contains(Self::READY_MARKER) {
// Keep stderr alive
// https://github.com/alloy-rs/alloy/issues/2091#issuecomment-2676134147
thread::spawn(move || for _ in stderr.by_ref() {});

self.handle = child.into();
return Ok(self);
let logs_file = OpenOptions::new()
.read(true)
.write(false)
.append(false)
.truncate(false)
.open(self.geth_stderr_log_file_path())?;

let maximum_wait_time = Duration::from_millis(self.start_timeout);
let mut stderr = BufReader::new(logs_file).lines();
loop {
if let Some(Ok(line)) = stderr.next() {
if line.contains(Self::ERROR_MARKER) {
anyhow::bail!("Failed to start geth {line}");
}
if line.contains(Self::READY_MARKER) {
return Ok(self);
}
}
if Instant::now().duration_since(start_time) > maximum_wait_time {
break "spawn timeout".to_string();
anyhow::bail!("Timeout in starting geth");
}
};
}
}

let _ = child.kill();
anyhow::bail!("geth node #{} spawn error: {error}", self.id)
#[tracing::instrument(skip_all, fields(geth_node_id = self.id), level = Level::TRACE)]
fn geth_stdout_log_file_path(&self) -> PathBuf {
self.logs_directory.join(Self::GETH_STDOUT_LOG_FILE_NAME)
}

#[tracing::instrument(skip_all, fields(geth_node_id = self.id), level = Level::TRACE)]
fn geth_stderr_log_file_path(&self) -> PathBuf {
self.logs_directory.join(Self::GETH_STDERR_LOG_FILE_NAME)
}
}

impl EthereumNode for Instance {
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn execute_transaction(
&self,
transaction: TransactionRequest,
Expand Down Expand Up @@ -241,6 +287,7 @@ impl EthereumNode for Instance {
}))
}

#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn trace_transaction(
&self,
transaction: TransactionReceipt,
Expand All @@ -263,6 +310,7 @@ impl EthereumNode for Instance {
}))
}

#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn state_diff(
&self,
transaction: alloy::rpc::types::TransactionReceipt,
Expand All @@ -276,6 +324,7 @@ impl EthereumNode for Instance {
}
}

#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn fetch_add_nonce(&self, address: Address) -> anyhow::Result<u64> {
let connection_string = self.connection_string.clone();
let wallet = self.wallet.clone();
Expand All @@ -299,6 +348,7 @@ impl Node for Instance {
Self {
connection_string: base_directory.join(Self::IPC_FILE).display().to_string(),
data_directory: base_directory.join(Self::DATA_DIRECTORY),
logs_directory: base_directory.join(Self::LOGS_DIRECTORY),
base_directory,
geth: config.geth.clone(),
id,
Expand All @@ -307,22 +357,46 @@ impl Node for Instance {
start_timeout: config.geth_start_timeout,
wallet: config.wallet(),
nonces: Mutex::new(HashMap::new()),
// We know that we only need to be storing 2 files so we can specify that when creating
// the vector. It's the stdout and stderr of the geth node.
logs_file_to_flush: Vec::with_capacity(2),
}
}

#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn connection_string(&self) -> String {
self.connection_string.clone()
}

fn shutdown(self) -> anyhow::Result<()> {
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn shutdown(&mut self) -> anyhow::Result<()> {
// Terminate the processes in a graceful manner to allow for the output to be flushed.
if let Some(mut child) = self.handle.take() {
child
.kill()
.map_err(|error| anyhow::anyhow!("Failed to kill the geth process: {error:?}"))?;
}

// Flushing the files that we're using for keeping the logs before shutdown.
for file in self.logs_file_to_flush.iter_mut() {
file.flush()?
}

// Remove the node's database so that subsequent runs do not run on the same database. We
// ignore the error just in case the directory didn't exist in the first place and therefore
// there's nothing to be deleted.
let _ = remove_dir_all(self.base_directory.join(Self::DATA_DIRECTORY));

Ok(())
}

#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn spawn(&mut self, genesis: String) -> anyhow::Result<()> {
self.init(genesis)?.spawn_process()?.wait_ready()?;
self.init(genesis)?.spawn_process()?;
Ok(())
}

#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn version(&self) -> anyhow::Result<String> {
let output = Command::new(&self.geth)
.arg("--version")
Expand All @@ -337,14 +411,9 @@ impl Node for Instance {
}

impl Drop for Instance {
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn drop(&mut self) {
tracing::info!(id = self.id, "Dropping node");
if let Some(child) = self.handle.as_mut() {
let _ = child.kill();
}
if self.base_directory.exists() {
let _ = remove_dir_all(&self.base_directory);
}
self.shutdown().expect("Failed to shutdown")
}
}

Expand Down
Loading