Skip to content

Commit c471db4

Browse files
committed
chore: rewrite movement aptos embedded runner.
1 parent 4fb7c2d commit c471db4

8 files changed

Lines changed: 1221 additions & 66 deletions

File tree

Cargo.lock

Lines changed: 940 additions & 65 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ members = [
1818
"checks/e2e/citeria/*",
1919
# util
2020
"util/movement/*",
21+
"util/movement-aptos/*",
2122

2223
]
2324

@@ -108,7 +109,8 @@ aptos-db = { git = "https://github.com/movementlabsxyz/aptos-core.git", rev = "2
108109
aptos_schemadb = { git = "https://github.com/movementlabsxyz/aptos-core.git", rev = "2941ee2b5876ab93cfb8010154c9466d7d593ed9" }
109110
aptos-config = { git = "https://github.com/movementlabsxyz/aptos-core.git", rev = "2941ee2b5876ab93cfb8010154c9466d7d593ed9" }
110111
aptos-db-indexer = { git = "https://github.com/movementlabsxyz/aptos-core.git", rev = "2941ee2b5876ab93cfb8010154c9466d7d593ed9" }
111-
# model checking and verification
112+
aptos-node = { git = "https://github.com/movementlabsxyz/aptos-core.git", rev = "2941ee2b5876ab93cfb8010154c9466d7d593ed9" }
113+
aptos-cached-packages = { git = "https://github.com/movementlabsxyz/aptos-core.git", rev = "2941ee2b5876ab93cfb8010154c9466d7d593ed9" }
112114

113115

114116
# secure-signing
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
[package]
2+
name = "movement-aptos-core"
3+
version = { workspace = true }
4+
edition = { workspace = true }
5+
license = { workspace = true }
6+
authors = { workspace = true }
7+
homepage = { workspace = true }
8+
publish = { workspace = true }
9+
rust-version = { workspace = true }
10+
11+
[dependencies]
12+
thiserror = { workspace = true }
13+
serde = { workspace = true }
14+
anyhow = { workspace = true }
15+
hex = { workspace = true }
16+
syncador = { workspace = true }
17+
tokio = { workspace = true }
18+
aptos-node = { workspace = true }
19+
aptos-config = { workspace = true }
20+
serde_json = { workspace = true }
21+
clap = { workspace = true }
22+
jsonlvar = { workspace = true }
23+
orfile = { workspace = true }
24+
kestrel = { workspace = true }
25+
reqwest = { workspace = true }
26+
[dev-dependencies]
27+
uuid = { workspace = true }
28+
chrono = { workspace = true }
29+
aptos-cached-packages = { workspace = true }
30+
rand = { workspace = true }
31+
32+
[lints]
33+
workspace = true
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# `movement-core`
2+
The core lib for running an embedded Movement Full Node.
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
use aptos_config::config::NodeConfig;
2+
use clap::Parser;
3+
use jsonlvar::Jsonl;
4+
use orfile::Orfile;
5+
use serde::{Deserialize, Serialize};
6+
use std::path::PathBuf;
7+
use std::str::FromStr;
8+
9+
use crate::movement_aptos::MovementAptos;
10+
11+
#[derive(Debug, Serialize, Deserialize, Clone)]
12+
pub struct NodeConfigWrapper(NodeConfig);
13+
14+
impl FromStr for NodeConfigWrapper {
15+
type Err = ConfigError;
16+
17+
fn from_str(s: &str) -> Result<Self, Self::Err> {
18+
// if "default" return the default config
19+
if s == "default" {
20+
return Ok(NodeConfigWrapper(NodeConfig::default()));
21+
}
22+
23+
// otherwise, parse the json
24+
let node_config: NodeConfig =
25+
serde_json::from_str(s).map_err(|e| ConfigError::Internal(e.into()))?;
26+
Ok(NodeConfigWrapper(node_config))
27+
}
28+
}
29+
30+
/// Errors thrown when parsing an [Eth] network.
31+
#[derive(Debug, thiserror::Error)]
32+
pub enum ConfigError {
33+
#[error("movment-core Config encountered an internal error: {0}")]
34+
Internal(#[source] Box<dyn std::error::Error + Send + Sync>),
35+
}
36+
37+
#[derive(Parser, Debug, Serialize, Deserialize, Clone, Jsonl, Orfile)]
38+
#[clap(help_expected = true)]
39+
pub struct Config {
40+
/// The node config to use.
41+
#[clap(long)]
42+
pub node_config: NodeConfigWrapper,
43+
44+
/// The log file to use.
45+
#[orfile(config)]
46+
#[clap(long)]
47+
pub log_file: Option<PathBuf>,
48+
49+
/// Whether to create a global rayon pool.
50+
#[orfile(config)]
51+
#[clap(long)]
52+
pub create_global_rayon_pool: bool,
53+
}
54+
55+
impl Config {
56+
/// Builds the config into a [MovementAptos] runner.
57+
pub fn build(&self) -> Result<MovementAptos, ConfigError> {
58+
Ok(MovementAptos::new(
59+
self.node_config.0.clone(),
60+
self.log_file.clone(),
61+
self.create_global_rayon_pool,
62+
))
63+
}
64+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pub mod config;
2+
pub mod movement_aptos;
3+
4+
pub use config::*;
5+
pub use movement_aptos::*;
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
use aptos_config::config::NodeConfig;
2+
use kestrel::State;
3+
use std::path::PathBuf;
4+
pub mod rest_api;
5+
use kestrel::fulfill::custom::Custom;
6+
pub use rest_api::RestApi;
7+
use rest_api::WaitForRestApi;
8+
9+
/// Errors thrown when running [MovementAptos].
10+
#[derive(Debug, thiserror::Error)]
11+
pub enum MovementAptosError {
12+
#[error("Movement Aptos failed to run with error: {0}")]
13+
Internal(#[source] Box<dyn std::error::Error + Send + Sync>),
14+
}
15+
16+
#[derive(Clone)]
17+
pub struct MovementAptos {
18+
/// The [NodeConfig] for the Aptos node.
19+
pub node_config: NodeConfig,
20+
/// The path to the log file.
21+
pub log_file: Option<PathBuf>,
22+
/// Whether to create a global rayon pool.
23+
pub create_global_rayon_pool: bool,
24+
/// The [MovementAptosRestApi] for the Aptos node.
25+
pub rest_api: State<RestApi>,
26+
}
27+
28+
impl MovementAptos {
29+
pub fn new(
30+
node_config: NodeConfig,
31+
log_file: Option<PathBuf>,
32+
create_global_rayon_pool: bool,
33+
) -> Self {
34+
Self { node_config, log_file, create_global_rayon_pool, rest_api: State::new() }
35+
}
36+
37+
/// Borrow the rest api state
38+
pub fn rest_api(&self) -> &State<RestApi> {
39+
&self.rest_api
40+
}
41+
42+
/// Runs the internal node logic
43+
pub(crate) fn run_node(&self) -> Result<(), MovementAptosError> {
44+
aptos_node::start(
45+
self.node_config.clone(),
46+
self.log_file.clone(),
47+
self.create_global_rayon_pool,
48+
)
49+
.map_err(|e| MovementAptosError::Internal(e.into()))?;
50+
51+
Ok(())
52+
}
53+
54+
/// Runs the node and fills state.
55+
pub async fn run(&self) -> Result<(), MovementAptosError> {
56+
let rest_api = RestApi {
57+
rest_api_url: format!(
58+
"http://{}:{}",
59+
self.node_config.api.address.ip(),
60+
self.node_config.api.address.port()
61+
),
62+
};
63+
64+
let runner = self.clone();
65+
let runner_task = kestrel::task(async move {
66+
runner.run_node()?;
67+
Ok::<_, MovementAptosError>(())
68+
});
69+
70+
// rest api state
71+
let rest_api_state = self.rest_api.clone();
72+
let rest_api_polling = kestrel::task(async move {
73+
loop {
74+
// wait for the rest api to be ready
75+
let response = reqwest::get(rest_api.rest_api_url.clone())
76+
.await
77+
.map_err(|e| MovementAptosError::Internal(e.into()))?;
78+
if response.status().is_success() {
79+
rest_api_state.write().set(rest_api).await;
80+
break;
81+
}
82+
}
83+
84+
Ok::<_, MovementAptosError>(())
85+
});
86+
87+
// await the runner
88+
runner_task.await.map_err(|e| MovementAptosError::Internal(e.into()))??;
89+
rest_api_polling.await.map_err(|e| MovementAptosError::Internal(e.into()))??;
90+
91+
Ok(())
92+
}
93+
}
94+
95+
#[cfg(test)]
96+
mod tests {
97+
use super::*;
98+
use aptos_node::create_single_node_test_config;
99+
use rand::thread_rng;
100+
use std::path::Path;
101+
102+
#[tokio::test]
103+
async fn test_movement_aptos() -> Result<(), anyhow::Error> {
104+
// open in a new db
105+
let unique_id = uuid::Uuid::new_v4();
106+
let timestamp = chrono::Utc::now().timestamp_millis();
107+
let db_dir = Path::new(".debug").join(format!(
108+
"movement-aptos-db-{}-{}",
109+
timestamp,
110+
unique_id.to_string().split('-').next().unwrap()
111+
));
112+
113+
// create parent dirs
114+
std::fs::create_dir_all(db_dir.clone())?;
115+
116+
let rng = thread_rng();
117+
let node_config = create_single_node_test_config(
118+
&None,
119+
&None,
120+
db_dir.as_path(),
121+
true,
122+
false,
123+
true,
124+
&aptos_cached_packages::head_release_bundle().clone(),
125+
rng,
126+
)?;
127+
128+
let movement_aptos = MovementAptos::new(node_config, None, false);
129+
let rest_api_state = movement_aptos.rest_api().read().clone();
130+
movement_aptos.run().await?;
131+
rest_api_state.wait_for(tokio::time::Duration::from_secs(30)).await?;
132+
133+
Ok(())
134+
}
135+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
use kestrel::{fulfill::custom::CustomProcessor, fulfill::FulfillError};
2+
use tokio::sync::mpsc::Receiver;
3+
4+
#[derive(Debug, Clone)]
5+
pub struct RestApi {
6+
/// The Rest Api url.
7+
pub rest_api_url: String,
8+
}
9+
10+
pub struct WaitForRestApi {
11+
/// The expected [RestApi]
12+
pub rest_api: RestApi,
13+
}
14+
15+
impl WaitForRestApi {
16+
pub fn new(rest_api: RestApi) -> Self {
17+
Self { rest_api }
18+
}
19+
}
20+
21+
impl CustomProcessor<RestApi> for WaitForRestApi {
22+
async fn process_receiver(
23+
&self,
24+
receiver: &mut Receiver<String>,
25+
) -> Result<Option<RestApi>, FulfillError> {
26+
loop {
27+
// use the receiver to pace the polling
28+
receiver.recv().await;
29+
30+
// keep curling the rest api until the faucet is ready
31+
let response = reqwest::get(self.rest_api.rest_api_url.clone())
32+
.await
33+
.map_err(|e| FulfillError::Internal(e.into()))?;
34+
if response.status().is_success() {
35+
return Ok(Some(self.rest_api.clone()));
36+
}
37+
}
38+
}
39+
}

0 commit comments

Comments
 (0)