@@ -2,8 +2,11 @@ use aptos_config::config::NodeConfig;
22use kestrel:: State ;
33use std:: path:: PathBuf ;
44pub mod rest_api;
5+ pub mod runtime;
56pub use rest_api:: RestApi ;
67
8+ use runtime:: Runtime ;
9+
710/// Errors thrown when running [MovementAptos].
811#[ derive( Debug , thiserror:: Error ) ]
912pub enum MovementAptosError {
@@ -12,28 +15,41 @@ pub enum MovementAptosError {
1215}
1316
1417#[ derive( Clone ) ]
15- pub struct MovementAptos {
18+ pub struct MovementAptos < R >
19+ where
20+ R : Runtime ,
21+ {
1622 /// The [NodeConfig] for the Aptos node.
1723 pub node_config : NodeConfig ,
1824 /// The path to the log file.
1925 pub log_file : Option < PathBuf > ,
20- /// Whether to create a global rayon pool .
21- pub create_global_rayon_pool : bool ,
26+ /// The runtime for the Aptos node .
27+ pub runtime : std :: marker :: PhantomData < R > ,
2228 /// The [MovementAptosRestApi] for the Aptos node.
2329 pub rest_api : State < RestApi > ,
2430}
2531
26- impl MovementAptos {
27- pub fn new (
32+ impl < R > MovementAptos < R >
33+ where
34+ R : Runtime ,
35+ {
36+ /// If you have something that marks your ability to get a runtime, you can use this.
37+ pub fn new ( node_config : NodeConfig , log_file : Option < PathBuf > , _runtime : R ) -> Self {
38+ Self { node_config, log_file, runtime : std:: marker:: PhantomData , rest_api : State :: new ( ) }
39+ }
40+
41+ /// Checks runtime availability and creates a new [MovementAptos].
42+ pub fn try_new (
2843 node_config : NodeConfig ,
2944 log_file : Option < PathBuf > ,
30- create_global_rayon_pool : bool ,
31- ) -> Self {
32- Self { node_config, log_file, create_global_rayon_pool, rest_api : State :: new ( ) }
45+ ) -> Result < Self , anyhow:: Error > {
46+ let runtime = R :: try_new ( ) ?;
47+ let movement_aptos = MovementAptos :: new ( node_config, log_file, runtime) ;
48+ Ok ( movement_aptos)
3349 }
3450
3551 pub fn from_config ( config : NodeConfig ) -> Result < Self , anyhow:: Error > {
36- let movement_aptos = MovementAptos :: new ( config, None , false ) ;
52+ let movement_aptos = MovementAptos :: new ( config, None , R :: try_new ( ) ? ) ;
3753 Ok ( movement_aptos)
3854 }
3955
@@ -43,15 +59,37 @@ impl MovementAptos {
4359 }
4460
4561 /// Runs the internal node logic
46- pub ( crate ) fn run_node ( & self ) -> Result < ( ) , MovementAptosError > {
47- aptos_node:: start (
48- self . node_config . clone ( ) ,
49- self . log_file . clone ( ) ,
50- self . create_global_rayon_pool ,
51- )
52- . map_err ( |e| MovementAptosError :: Internal ( e. into ( ) ) ) ?;
53-
54- Ok ( ( ) )
62+ pub ( crate ) async fn run_node ( & self ) -> Result < ( ) , MovementAptosError > {
63+ // Clone necessary data for the closure
64+ let node_config = self . node_config . clone ( ) ;
65+ let log_file = self . log_file . clone ( ) ;
66+
67+ // Spawn the blocking task
68+ let blocking_task_result = tokio:: task:: spawn_blocking ( move || {
69+ // This closure runs on a blocking thread
70+ aptos_node:: start (
71+ node_config,
72+ log_file,
73+ R :: create_global_rayon_pool ( ) , // Assuming R is in scope and its result is Send
74+ )
75+ // The closure should return the direct result from aptos_node::start.
76+ // The error type from aptos_node::start (let's call it AptosNodeError)
77+ // needs to be Send + 'static for the closure.
78+ } )
79+ . await ;
80+
81+ match blocking_task_result {
82+ Ok ( Ok ( ( ) ) ) => Ok ( ( ) ) , // aptos_node::start succeeded
83+ Ok ( Err ( aptos_node_err) ) => {
84+ // aptos_node::start failed. We need aptos_node_err to be convertible
85+ // into the Box<dyn Error> for MovementAptosError::Internal.
86+ Err ( MovementAptosError :: Internal ( aptos_node_err. into ( ) ) )
87+ }
88+ Err ( join_err) => {
89+ // spawn_blocking task failed (e.g., panicked or was cancelled by Tokio)
90+ Err ( MovementAptosError :: Internal ( Box :: new ( join_err) ) )
91+ }
92+ }
5593 }
5694
5795 /// Runs the node and fills state.
@@ -66,18 +104,22 @@ impl MovementAptos {
66104
67105 let runner = self . clone ( ) ;
68106 let runner_task = kestrel:: task ( async move {
69- runner. run_node ( ) ?;
107+ runner. run_node ( ) . await ?;
70108 Ok :: < _ , MovementAptosError > ( ( ) )
71109 } ) ;
72110
73111 // rest api state
74112 let rest_api_state = self . rest_api . clone ( ) ;
75113 let rest_api_polling = kestrel:: task ( async move {
76114 loop {
115+ tokio:: time:: sleep ( std:: time:: Duration :: from_secs ( 1 ) ) . await ;
116+ println ! ( "POLLING REST API: {:?}" , rest_api) ;
77117 // wait for the rest api to be ready
78118 let response = reqwest:: get ( rest_api. rest_api_url . clone ( ) )
79119 . await
80120 . map_err ( |e| MovementAptosError :: Internal ( e. into ( ) ) ) ?;
121+
122+ println ! ( "REST API RESPONSE: {:?}" , response) ;
81123 if response. status ( ) . is_success ( ) {
82124 rest_api_state. write ( ) . set ( rest_api) . await ;
83125 break ;
@@ -102,7 +144,7 @@ mod tests {
102144 use rand:: thread_rng;
103145 use std:: path:: Path ;
104146
105- #[ tokio:: test]
147+ #[ tokio:: test( flavor = "multi_thread" ) ]
106148 async fn test_movement_aptos ( ) -> Result < ( ) , anyhow:: Error > {
107149 // open in a new db
108150 let unique_id = uuid:: Uuid :: new_v4 ( ) ;
@@ -128,11 +170,20 @@ mod tests {
128170 rng,
129171 ) ?;
130172
131- let movement_aptos = MovementAptos :: new ( node_config, None , false ) ;
173+ let movement_aptos = MovementAptos :: < runtime :: TokioTest > :: try_new ( node_config, None ) ? ;
132174 let rest_api_state = movement_aptos. rest_api ( ) . read ( ) . clone ( ) ;
133- movement_aptos. run ( ) . await ?;
175+
176+ let movement_aptos_task = kestrel:: task ( async move {
177+ movement_aptos. run ( ) . await ?;
178+ Ok :: < _ , MovementAptosError > ( ( ) )
179+ } ) ;
180+
134181 rest_api_state. wait_for ( tokio:: time:: Duration :: from_secs ( 30 ) ) . await ?;
135182
183+ println ! ( "ENDING MOVEMENT APTOS" ) ;
184+
185+ kestrel:: end!( movement_aptos_task) ?;
186+
136187 Ok ( ( ) )
137188 }
138189}
0 commit comments