22
33use std:: {
44 collections:: HashMap ,
5- fs:: { File , create_dir_all , remove_dir_all } ,
5+ fs:: { File , OpenOptions , create_dir_all } ,
66 io:: { BufRead , BufReader , Read , Write } ,
77 path:: PathBuf ,
8- process:: { Child , Command , Stdio } ,
8+ process:: { Command , Stdio } ,
99 sync:: {
1010 Mutex ,
1111 atomic:: { AtomicU32 , Ordering } ,
1212 } ,
13- thread,
1413 time:: { Duration , Instant } ,
1514} ;
1615
@@ -28,6 +27,7 @@ use revive_dt_node_interaction::{
2827 EthereumNode , nonce:: fetch_onchain_nonce, trace:: trace_transaction,
2928 transaction:: execute_transaction,
3029} ;
30+ use subprocess:: { Exec , Popen } ;
3131
3232use crate :: Node ;
3333
@@ -47,7 +47,7 @@ pub struct Instance {
4747 data_directory : PathBuf ,
4848 geth : PathBuf ,
4949 id : u32 ,
50- handle : Option < Child > ,
50+ handle : Option < Popen > ,
5151 network_id : u64 ,
5252 start_timeout : u64 ,
5353 wallet : EthereumWallet ,
@@ -65,8 +65,10 @@ impl Instance {
6565 const ERROR_MARKER : & str = "Fatal:" ;
6666
6767 /// Create the node directory and call `geth init` to configure the genesis.
68+ #[ tracing:: instrument( skip_all, fields( geth_node_id = self . id) ) ]
6869 fn init ( & mut self , genesis : String ) -> anyhow:: Result < & mut Self > {
6970 create_dir_all ( & self . base_directory ) ?;
71+ create_dir_all ( & self . base_directory . join ( "logs" ) ) ?;
7072
7173 let genesis_path = self . base_directory . join ( Self :: GENESIS_JSON_FILE ) ;
7274 File :: create ( & genesis_path) ?. write_all ( genesis. as_bytes ( ) ) ?;
@@ -96,9 +98,19 @@ impl Instance {
9698
9799 /// Spawn the go-ethereum node child process.
98100 ///
99- /// [Instance::init] must be called priorly.
101+ /// [Instance::init] must be called prior.
102+ #[ tracing:: instrument( skip_all, fields( geth_node_id = self . id) ) ]
100103 fn spawn_process ( & mut self ) -> anyhow:: Result < & mut Self > {
101- self . handle = Command :: new ( & self . geth )
104+ let node_logs_file_path = self . base_directory . join ( "logs" ) . join ( "node.log" ) ;
105+ let node_logs_file = OpenOptions :: new ( )
106+ // Options to re-create and re-write to the file starting at offset zero. We do not want
107+ // to re-use log files between runs. Users that want to keep their log files should pass
108+ // in a different working directory between runs.
109+ . create ( true )
110+ . truncate ( true )
111+ . write ( true )
112+ . open ( & node_logs_file_path) ?;
113+ self . handle = Exec :: cmd ( & self . geth )
102114 . arg ( "--dev" )
103115 . arg ( "--datadir" )
104116 . arg ( & self . data_directory )
@@ -109,49 +121,58 @@ impl Instance {
109121 . arg ( "--nodiscover" )
110122 . arg ( "--maxpeers" )
111123 . arg ( "0" )
112- . stderr ( Stdio :: piped ( ) )
113- . stdout ( Stdio :: null ( ) )
114- . spawn ( ) ?
124+ // We pipe both stdout and stderr to the same log file and therefore we're persisting
125+ // both. In the implementation of [`std::fs::File`] the `try_clone` method will ensure
126+ // that both [`std::fs::File`] objects have the same seeks and offsets and therefore we
127+ // don't have to worry about either streams overriding each other.
128+ . stderr ( node_logs_file. try_clone ( ) ?)
129+ . stdout ( node_logs_file)
130+ . popen ( ) ?
115131 . into ( ) ;
132+
133+ if let Err ( error) = self . wait_ready ( ) {
134+ tracing:: error!( ?error, "Failed to start geth, shutting down gracefully" ) ;
135+ self . shutdown ( ) ?;
136+ return Err ( error) ;
137+ }
138+
116139 Ok ( self )
117140 }
118141
119142 /// Wait for the g-ethereum node child process getting ready.
120143 ///
121144 /// [Instance::spawn_process] must be called priorly.
145+ #[ tracing:: instrument( skip_all, fields( geth_node_id = self . id) ) ]
122146 fn wait_ready ( & mut self ) -> anyhow:: Result < & mut Self > {
123- // Thanks clippy but geth is a server; we don't `wait` but eventually kill it.
124- #[ allow( clippy:: zombie_processes) ]
125- let mut child = self . handle . take ( ) . expect ( "should be spawned" ) ;
126147 let start_time = Instant :: now ( ) ;
127- let maximum_wait_time = Duration :: from_millis ( self . start_timeout ) ;
128- let mut stderr = BufReader :: new ( child. stderr . take ( ) . expect ( "should be piped" ) ) . lines ( ) ;
129- let error = loop {
130- let Some ( Ok ( line) ) = stderr. next ( ) else {
131- break "child process stderr reading error" . to_string ( ) ;
132- } ;
133- if line. contains ( Self :: ERROR_MARKER ) {
134- break line;
135- }
136- if line. contains ( Self :: READY_MARKER ) {
137- // Keep stderr alive
138- // https://github.com/alloy-rs/alloy/issues/2091#issuecomment-2676134147
139- thread:: spawn ( move || for _ in stderr. by_ref ( ) { } ) ;
140148
141- self . handle = child. into ( ) ;
142- return Ok ( self ) ;
149+ let logs_file = OpenOptions :: new ( )
150+ . read ( true )
151+ . write ( false )
152+ . append ( false )
153+ . truncate ( false )
154+ . open ( self . base_directory . join ( "logs" ) . join ( "node.log" ) ) ?;
155+
156+ let maximum_wait_time = Duration :: from_millis ( self . start_timeout ) ;
157+ let mut stderr = BufReader :: new ( logs_file) . lines ( ) ;
158+ loop {
159+ if let Some ( Ok ( line) ) = stderr. next ( ) {
160+ if line. contains ( Self :: ERROR_MARKER ) {
161+ anyhow:: bail!( "Failed to start geth {line}" ) ;
162+ }
163+ if line. contains ( Self :: READY_MARKER ) {
164+ return Ok ( self ) ;
165+ }
143166 }
144167 if Instant :: now ( ) . duration_since ( start_time) > maximum_wait_time {
145- break "spawn timeout" . to_string ( ) ;
168+ anyhow :: bail! ( "Timeout in starting geth" ) ;
146169 }
147- } ;
148-
149- let _ = child. kill ( ) ;
150- anyhow:: bail!( "geth node #{} spawn error: {error}" , self . id)
170+ }
151171 }
152172}
153173
154174impl EthereumNode for Instance {
175+ #[ tracing:: instrument( skip_all, fields( geth_node_id = self . id) ) ]
155176 fn execute_transaction (
156177 & self ,
157178 transaction : TransactionRequest ,
@@ -173,6 +194,7 @@ impl EthereumNode for Instance {
173194 } ) )
174195 }
175196
197+ #[ tracing:: instrument( skip_all, fields( geth_node_id = self . id) ) ]
176198 fn trace_transaction (
177199 & self ,
178200 transaction : TransactionReceipt ,
@@ -195,6 +217,7 @@ impl EthereumNode for Instance {
195217 } ) )
196218 }
197219
220+ #[ tracing:: instrument( skip_all, fields( geth_node_id = self . id) ) ]
198221 fn state_diff (
199222 & self ,
200223 transaction : alloy:: rpc:: types:: TransactionReceipt ,
@@ -208,6 +231,7 @@ impl EthereumNode for Instance {
208231 }
209232 }
210233
234+ #[ tracing:: instrument( skip_all, fields( geth_node_id = self . id) ) ]
211235 fn fetch_add_nonce ( & self , address : Address ) -> anyhow:: Result < u64 > {
212236 let connection_string = self . connection_string . clone ( ) ;
213237 let wallet = self . wallet . clone ( ) ;
@@ -242,19 +266,31 @@ impl Node for Instance {
242266 }
243267 }
244268
269+ #[ tracing:: instrument( skip_all, fields( geth_node_id = self . id) ) ]
245270 fn connection_string ( & self ) -> String {
246271 self . connection_string . clone ( )
247272 }
248273
249- fn shutdown ( self ) -> anyhow:: Result < ( ) > {
274+ #[ tracing:: instrument( skip_all, fields( geth_node_id = self . id) ) ]
275+ fn shutdown ( & mut self ) -> anyhow:: Result < ( ) > {
276+ if let Some ( mut child) = self . handle . take ( ) {
277+ child. terminate ( ) . map_err ( |error| {
278+ anyhow:: anyhow!( "Failed to terminate the geth process: {error:?}" )
279+ } ) ?;
280+ child. wait ( ) . map_err ( |error| {
281+ anyhow:: anyhow!( "Failed to wait for the termination of the geth process: {error:?}" )
282+ } ) ?;
283+ }
250284 Ok ( ( ) )
251285 }
252286
287+ #[ tracing:: instrument( skip_all, fields( geth_node_id = self . id) ) ]
253288 fn spawn ( & mut self , genesis : String ) -> anyhow:: Result < ( ) > {
254- self . init ( genesis) ?. spawn_process ( ) ?. wait_ready ( ) ? ;
289+ self . init ( genesis) ?. spawn_process ( ) ?;
255290 Ok ( ( ) )
256291 }
257292
293+ #[ tracing:: instrument( skip_all, fields( geth_node_id = self . id) ) ]
258294 fn version ( & self ) -> anyhow:: Result < String > {
259295 let output = Command :: new ( & self . geth )
260296 . arg ( "--version" )
@@ -269,13 +305,9 @@ impl Node for Instance {
269305}
270306
271307impl Drop for Instance {
308+ #[ tracing:: instrument( skip_all, fields( geth_node_id = self . id) ) ]
272309 fn drop ( & mut self ) {
273- if let Some ( child) = self . handle . as_mut ( ) {
274- let _ = child. kill ( ) ;
275- }
276- if self . base_directory . exists ( ) {
277- let _ = remove_dir_all ( & self . base_directory ) ;
278- }
310+ self . shutdown ( ) . expect ( "Failed to shutdown" )
279311 }
280312}
281313
0 commit comments