Skip to content

Commit b204de5

Browse files
authored
Persist node logs (#36)
* Persist node logs * Fix clippy lints * Delete the node's db on shutdown but persist logs * Fix tests * Separate stdout and stderr and use more consts. * More consistent handling of open options * Revert the use of subprocess * Remove outdated comment * Flush the log files on drop * Rename `log_files` -> `logs_file_to_flush`
1 parent 5eb3a0e commit b204de5

File tree

3 files changed

+253
-75
lines changed

3 files changed

+253
-75
lines changed

crates/node/src/geth.rs

Lines changed: 105 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,14 @@
22
33
use std::{
44
collections::HashMap,
5-
fs::{File, create_dir_all, remove_dir_all},
5+
fs::{File, OpenOptions, create_dir_all, remove_dir_all},
66
io::{BufRead, BufReader, Read, Write},
77
path::PathBuf,
88
process::{Child, Command, Stdio},
99
sync::{
1010
Mutex,
1111
atomic::{AtomicU32, Ordering},
1212
},
13-
thread,
1413
time::{Duration, Instant},
1514
};
1615

@@ -28,6 +27,7 @@ use revive_dt_node_interaction::{
2827
EthereumNode, nonce::fetch_onchain_nonce, trace::trace_transaction,
2928
transaction::execute_transaction,
3029
};
30+
use tracing::Level;
3131

3232
use crate::Node;
3333

@@ -45,28 +45,40 @@ pub struct Instance {
4545
connection_string: String,
4646
base_directory: PathBuf,
4747
data_directory: PathBuf,
48+
logs_directory: PathBuf,
4849
geth: PathBuf,
4950
id: u32,
5051
handle: Option<Child>,
5152
network_id: u64,
5253
start_timeout: u64,
5354
wallet: EthereumWallet,
5455
nonces: Mutex<HashMap<Address, u64>>,
56+
/// This vector stores [`File`] objects that we use for logging which we want to flush when the
57+
/// node object is dropped. We do not store them in a structured fashion at the moment (in
58+
/// separate fields) as the logic that we need to apply to them is all the same regardless of
59+
/// what it belongs to, we just want to flush them on [`Drop`] of the node.
60+
logs_file_to_flush: Vec<File>,
5561
}
5662

5763
impl Instance {
5864
const BASE_DIRECTORY: &str = "geth";
5965
const DATA_DIRECTORY: &str = "data";
66+
const LOGS_DIRECTORY: &str = "logs";
6067

6168
const IPC_FILE: &str = "geth.ipc";
6269
const GENESIS_JSON_FILE: &str = "genesis.json";
6370

6471
const READY_MARKER: &str = "IPC endpoint opened";
6572
const ERROR_MARKER: &str = "Fatal:";
6673

74+
const GETH_STDOUT_LOG_FILE_NAME: &str = "node_stdout.log";
75+
const GETH_STDERR_LOG_FILE_NAME: &str = "node_stderr.log";
76+
6777
/// Create the node directory and call `geth init` to configure the genesis.
78+
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
6879
fn init(&mut self, genesis: String) -> anyhow::Result<&mut Self> {
6980
create_dir_all(&self.base_directory)?;
81+
create_dir_all(&self.logs_directory)?;
7082

7183
let genesis_path = self.base_directory.join(Self::GENESIS_JSON_FILE);
7284
File::create(&genesis_path)?.write_all(genesis.as_bytes())?;
@@ -96,8 +108,24 @@ impl Instance {
96108

97109
/// Spawn the go-ethereum node child process.
98110
///
99-
/// [Instance::init] must be called priorly.
111+
/// [Instance::init] must be called prior.
112+
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
100113
fn spawn_process(&mut self) -> anyhow::Result<&mut Self> {
114+
// This is the `OpenOptions` that we wish to use for all of the log files that we will be
115+
// opening in this method. We need to construct it in this way to:
116+
// 1. Be consistent
117+
// 2. Less verbose and more dry
118+
// 3. Because the builder pattern uses mutable references so we need to get around that.
119+
let open_options = {
120+
let mut options = OpenOptions::new();
121+
options.create(true).truncate(true).write(true);
122+
options
123+
};
124+
125+
let stdout_logs_file = open_options
126+
.clone()
127+
.open(self.geth_stdout_log_file_path())?;
128+
let stderr_logs_file = open_options.open(self.geth_stderr_log_file_path())?;
101129
self.handle = Command::new(&self.geth)
102130
.arg("--dev")
103131
.arg("--datadir")
@@ -109,49 +137,67 @@ impl Instance {
109137
.arg("--nodiscover")
110138
.arg("--maxpeers")
111139
.arg("0")
112-
.stderr(Stdio::piped())
113-
.stdout(Stdio::null())
140+
.stderr(stderr_logs_file.try_clone()?)
141+
.stdout(stdout_logs_file.try_clone()?)
114142
.spawn()?
115143
.into();
144+
145+
if let Err(error) = self.wait_ready() {
146+
tracing::error!(?error, "Failed to start geth, shutting down gracefully");
147+
self.shutdown()?;
148+
return Err(error);
149+
}
150+
151+
self.logs_file_to_flush
152+
.extend([stderr_logs_file, stdout_logs_file]);
153+
116154
Ok(self)
117155
}
118156

119157
/// Wait for the g-ethereum node child process getting ready.
120158
///
121159
/// [Instance::spawn_process] must be called priorly.
160+
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
122161
fn wait_ready(&mut self) -> anyhow::Result<&mut Self> {
123-
// Thanks clippy but geth is a server; we don't `wait` but eventually kill it.
124-
#[allow(clippy::zombie_processes)]
125-
let mut child = self.handle.take().expect("should be spawned");
126162
let start_time = Instant::now();
127-
let maximum_wait_time = Duration::from_millis(self.start_timeout);
128-
let mut stderr = BufReader::new(child.stderr.take().expect("should be piped")).lines();
129-
let error = loop {
130-
let Some(Ok(line)) = stderr.next() else {
131-
break "child process stderr reading error".to_string();
132-
};
133-
if line.contains(Self::ERROR_MARKER) {
134-
break line;
135-
}
136-
if line.contains(Self::READY_MARKER) {
137-
// Keep stderr alive
138-
// https://github.com/alloy-rs/alloy/issues/2091#issuecomment-2676134147
139-
thread::spawn(move || for _ in stderr.by_ref() {});
140163

141-
self.handle = child.into();
142-
return Ok(self);
164+
let logs_file = OpenOptions::new()
165+
.read(true)
166+
.write(false)
167+
.append(false)
168+
.truncate(false)
169+
.open(self.geth_stderr_log_file_path())?;
170+
171+
let maximum_wait_time = Duration::from_millis(self.start_timeout);
172+
let mut stderr = BufReader::new(logs_file).lines();
173+
loop {
174+
if let Some(Ok(line)) = stderr.next() {
175+
if line.contains(Self::ERROR_MARKER) {
176+
anyhow::bail!("Failed to start geth {line}");
177+
}
178+
if line.contains(Self::READY_MARKER) {
179+
return Ok(self);
180+
}
143181
}
144182
if Instant::now().duration_since(start_time) > maximum_wait_time {
145-
break "spawn timeout".to_string();
183+
anyhow::bail!("Timeout in starting geth");
146184
}
147-
};
185+
}
186+
}
148187

149-
let _ = child.kill();
150-
anyhow::bail!("geth node #{} spawn error: {error}", self.id)
188+
#[tracing::instrument(skip_all, fields(geth_node_id = self.id), level = Level::TRACE)]
189+
fn geth_stdout_log_file_path(&self) -> PathBuf {
190+
self.logs_directory.join(Self::GETH_STDOUT_LOG_FILE_NAME)
191+
}
192+
193+
#[tracing::instrument(skip_all, fields(geth_node_id = self.id), level = Level::TRACE)]
194+
fn geth_stderr_log_file_path(&self) -> PathBuf {
195+
self.logs_directory.join(Self::GETH_STDERR_LOG_FILE_NAME)
151196
}
152197
}
153198

154199
impl EthereumNode for Instance {
200+
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
155201
fn execute_transaction(
156202
&self,
157203
transaction: TransactionRequest,
@@ -241,6 +287,7 @@ impl EthereumNode for Instance {
241287
}))
242288
}
243289

290+
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
244291
fn trace_transaction(
245292
&self,
246293
transaction: TransactionReceipt,
@@ -263,6 +310,7 @@ impl EthereumNode for Instance {
263310
}))
264311
}
265312

313+
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
266314
fn state_diff(
267315
&self,
268316
transaction: alloy::rpc::types::TransactionReceipt,
@@ -276,6 +324,7 @@ impl EthereumNode for Instance {
276324
}
277325
}
278326

327+
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
279328
fn fetch_add_nonce(&self, address: Address) -> anyhow::Result<u64> {
280329
let connection_string = self.connection_string.clone();
281330
let wallet = self.wallet.clone();
@@ -299,6 +348,7 @@ impl Node for Instance {
299348
Self {
300349
connection_string: base_directory.join(Self::IPC_FILE).display().to_string(),
301350
data_directory: base_directory.join(Self::DATA_DIRECTORY),
351+
logs_directory: base_directory.join(Self::LOGS_DIRECTORY),
302352
base_directory,
303353
geth: config.geth.clone(),
304354
id,
@@ -307,22 +357,46 @@ impl Node for Instance {
307357
start_timeout: config.geth_start_timeout,
308358
wallet: config.wallet(),
309359
nonces: Mutex::new(HashMap::new()),
360+
// We know that we only need to be storing 2 files so we can specify that when creating
361+
// the vector. It's the stdout and stderr of the geth node.
362+
logs_file_to_flush: Vec::with_capacity(2),
310363
}
311364
}
312365

366+
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
313367
fn connection_string(&self) -> String {
314368
self.connection_string.clone()
315369
}
316370

317-
fn shutdown(self) -> anyhow::Result<()> {
371+
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
372+
fn shutdown(&mut self) -> anyhow::Result<()> {
373+
// Terminate the processes in a graceful manner to allow for the output to be flushed.
374+
if let Some(mut child) = self.handle.take() {
375+
child
376+
.kill()
377+
.map_err(|error| anyhow::anyhow!("Failed to kill the geth process: {error:?}"))?;
378+
}
379+
380+
// Flushing the files that we're using for keeping the logs before shutdown.
381+
for file in self.logs_file_to_flush.iter_mut() {
382+
file.flush()?
383+
}
384+
385+
// Remove the node's database so that subsequent runs do not run on the same database. We
386+
// ignore the error just in case the directory didn't exist in the first place and therefore
387+
// there's nothing to be deleted.
388+
let _ = remove_dir_all(self.base_directory.join(Self::DATA_DIRECTORY));
389+
318390
Ok(())
319391
}
320392

393+
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
321394
fn spawn(&mut self, genesis: String) -> anyhow::Result<()> {
322-
self.init(genesis)?.spawn_process()?.wait_ready()?;
395+
self.init(genesis)?.spawn_process()?;
323396
Ok(())
324397
}
325398

399+
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
326400
fn version(&self) -> anyhow::Result<String> {
327401
let output = Command::new(&self.geth)
328402
.arg("--version")
@@ -337,14 +411,9 @@ impl Node for Instance {
337411
}
338412

339413
impl Drop for Instance {
414+
#[tracing::instrument(skip_all, fields(geth_node_id = self.id))]
340415
fn drop(&mut self) {
341-
tracing::info!(id = self.id, "Dropping node");
342-
if let Some(child) = self.handle.as_mut() {
343-
let _ = child.kill();
344-
}
345-
if self.base_directory.exists() {
346-
let _ = remove_dir_all(&self.base_directory);
347-
}
416+
self.shutdown().expect("Failed to shutdown")
348417
}
349418
}
350419

0 commit comments

Comments
 (0)