Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ tracing-subscriber = { version = "0.3.19", default-features = false, features =
"json",
"env-filter",
] }
subprocess = { version = "0.2.9" }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was last released 3 years ago. I'd prefer if we could just rawdog the std library for process management, I find it sufficient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was primarily using subprocess as I found that it allows for a way to perform graceful termination of the processes which allows them to flush their stdout and stderr buffers to the files (std library doesn't allow for graceful termination at the moment and I encountered cases where the flushing never happens if we just kill the process).

I think another alternative would be for us to use the standard library for process management and then have some OS specific logic for terminating the processes (which is slightly more overhead). It could be something like:

#[cfg(target_os = "windows")]
fn shutdown() {
    // Code that uses the winapis to perform the graceful termination
}

#[cfg(target_family = "unix")]
fn shutdown() {
    // Code that uses the unix APIs to perform the graceful termination
}

So it seemed better to use subprocess since we need the graceful termination for the logs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be just a file handle which can be flushed.

On that note, should probably be wrapped in a bufwriter too.


# revive compiler
revive-solc-json-interface = { git = "https://github.com/paritytech/revive", rev = "3389865af7c3ff6f29a586d82157e8bc573c1a8e" }
Expand Down
1 change: 1 addition & 0 deletions crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ rust-version.workspace = true
anyhow = { workspace = true }
alloy = { workspace = true }
tracing = { workspace = true }
subprocess = { workspace = true }

revive-dt-node-interaction = { workspace = true }
revive-dt-config = { workspace = true }
Expand Down
119 changes: 79 additions & 40 deletions crates/node/src/geth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@

use std::{
collections::HashMap,
fs::{File, create_dir_all, remove_dir_all},
fs::{File, OpenOptions, create_dir_all, remove_dir_all},
io::{BufRead, BufReader, Read, Write},
path::PathBuf,
process::{Child, Command, Stdio},
process::{Command, Stdio},
sync::{
Mutex,
atomic::{AtomicU32, Ordering},
},
thread,
time::{Duration, Instant},
};

Expand All @@ -28,6 +27,7 @@ use revive_dt_node_interaction::{
EthereumNode, nonce::fetch_onchain_nonce, trace::trace_transaction,
transaction::execute_transaction,
};
use subprocess::{Exec, Popen};

use crate::Node;

Expand All @@ -47,7 +47,7 @@ pub struct Instance {
data_directory: PathBuf,
geth: PathBuf,
id: u32,
handle: Option<Child>,
handle: Option<Popen>,
network_id: u64,
start_timeout: u64,
wallet: EthereumWallet,
Expand All @@ -65,8 +65,10 @@ impl Instance {
const ERROR_MARKER: &str = "Fatal:";

/// Create the node directory and call `geth init` to configure the genesis.
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn init(&mut self, genesis: String) -> anyhow::Result<&mut Self> {
create_dir_all(&self.base_directory)?;
create_dir_all(self.base_directory.join("logs"))?;

let genesis_path = self.base_directory.join(Self::GENESIS_JSON_FILE);
File::create(&genesis_path)?.write_all(genesis.as_bytes())?;
Expand Down Expand Up @@ -96,9 +98,19 @@ impl Instance {

/// Spawn the go-ethereum node child process.
///
/// [Instance::init] must be called priorly.
/// [Instance::init] must be called prior.
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn spawn_process(&mut self) -> anyhow::Result<&mut Self> {
self.handle = Command::new(&self.geth)
let node_logs_file_path = self.base_directory.join("logs").join("node.log");
let node_logs_file = OpenOptions::new()
// Options to re-create and re-write to the file starting at offset zero. We do not want
// to re-use log files between runs. Users that want to keep their log files should pass
// in a different working directory between runs.
.create(true)
.truncate(true)
.write(true)
.open(&node_logs_file_path)?;
self.handle = Exec::cmd(&self.geth)
.arg("--dev")
.arg("--datadir")
.arg(&self.data_directory)
Expand All @@ -109,49 +121,58 @@ impl Instance {
.arg("--nodiscover")
.arg("--maxpeers")
.arg("0")
.stderr(Stdio::piped())
.stdout(Stdio::null())
.spawn()?
// We pipe both stdout and stderr to the same log file and therefore we're persisting
// both. In the implementation of [`std::fs::File`] the `try_clone` method will ensure
// that both [`std::fs::File`] objects have the same seeks and offsets and therefore we
// don't have to worry about either streams overriding each other.
.stderr(node_logs_file.try_clone()?)
.stdout(node_logs_file)
.popen()?
.into();

if let Err(error) = self.wait_ready() {
tracing::error!(?error, "Failed to start geth, shutting down gracefully");
self.shutdown()?;
return Err(error);
}

Ok(self)
}

/// Wait for the g-ethereum node child process getting ready.
///
/// [Instance::spawn_process] must be called priorly.
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn wait_ready(&mut self) -> anyhow::Result<&mut Self> {
// Thanks clippy but geth is a server; we don't `wait` but eventually kill it.
#[allow(clippy::zombie_processes)]
let mut child = self.handle.take().expect("should be spawned");
let start_time = Instant::now();
let maximum_wait_time = Duration::from_millis(self.start_timeout);
let mut stderr = BufReader::new(child.stderr.take().expect("should be piped")).lines();
let error = loop {
let Some(Ok(line)) = stderr.next() else {
break "child process stderr reading error".to_string();
};
if line.contains(Self::ERROR_MARKER) {
break line;
}
if line.contains(Self::READY_MARKER) {
// Keep stderr alive
// https://github.com/alloy-rs/alloy/issues/2091#issuecomment-2676134147
thread::spawn(move || for _ in stderr.by_ref() {});

self.handle = child.into();
return Ok(self);
let logs_file = OpenOptions::new()
.read(true)
.write(false)
.append(false)
.truncate(false)
.open(self.base_directory.join("logs").join("node.log"))?;

let maximum_wait_time = Duration::from_millis(self.start_timeout);
let mut stderr = BufReader::new(logs_file).lines();
loop {
if let Some(Ok(line)) = stderr.next() {
if line.contains(Self::ERROR_MARKER) {
anyhow::bail!("Failed to start geth {line}");
}
if line.contains(Self::READY_MARKER) {
return Ok(self);
}
}
if Instant::now().duration_since(start_time) > maximum_wait_time {
break "spawn timeout".to_string();
anyhow::bail!("Timeout in starting geth");
}
};

let _ = child.kill();
anyhow::bail!("geth node #{} spawn error: {error}", self.id)
}
}
}

impl EthereumNode for Instance {
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn execute_transaction(
&self,
transaction: TransactionRequest,
Expand All @@ -173,6 +194,7 @@ impl EthereumNode for Instance {
}))
}

#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn trace_transaction(
&self,
transaction: TransactionReceipt,
Expand All @@ -195,6 +217,7 @@ impl EthereumNode for Instance {
}))
}

#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn state_diff(
&self,
transaction: alloy::rpc::types::TransactionReceipt,
Expand All @@ -208,6 +231,7 @@ impl EthereumNode for Instance {
}
}

#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn fetch_add_nonce(&self, address: Address) -> anyhow::Result<u64> {
let connection_string = self.connection_string.clone();
let wallet = self.wallet.clone();
Expand Down Expand Up @@ -242,19 +266,38 @@ impl Node for Instance {
}
}

#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn connection_string(&self) -> String {
self.connection_string.clone()
}

fn shutdown(self) -> anyhow::Result<()> {
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn shutdown(&mut self) -> anyhow::Result<()> {
// Terminate the processes in a graceful manner to allow for the output to be flushed.
if let Some(mut child) = self.handle.take() {
child.terminate().map_err(|error| {
anyhow::anyhow!("Failed to terminate the geth process: {error:?}")
})?;
child.wait().map_err(|error| {
anyhow::anyhow!("Failed to wait for the termination of the geth process: {error:?}")
})?;
}

// Remove the node's database so that subsequent runs do not run on the same database. We
// ignore the error just in case the directory didn't exist in the first place and therefore
// there's nothing to be deleted.
let _ = remove_dir_all(self.base_directory.join("data"));

Ok(())
}

#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn spawn(&mut self, genesis: String) -> anyhow::Result<()> {
self.init(genesis)?.spawn_process()?.wait_ready()?;
self.init(genesis)?.spawn_process()?;
Ok(())
}

#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn version(&self) -> anyhow::Result<String> {
let output = Command::new(&self.geth)
.arg("--version")
Expand All @@ -269,13 +312,9 @@ impl Node for Instance {
}

impl Drop for Instance {
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
fn drop(&mut self) {
if let Some(child) = self.handle.as_mut() {
let _ = child.kill();
}
if self.base_directory.exists() {
let _ = remove_dir_all(&self.base_directory);
}
self.shutdown().expect("Failed to shutdown")
}
}

Expand Down
Loading
Loading