diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 7dfb244..972e27a 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -21,7 +21,7 @@ jobs: - name: Install components run: | rustup component add rustfmt clippy - cargo install cargo-machete --version 0.8.0 + cargo install cargo-machete --version 0.8.0 --locked - name: Check format run: cargo fmt --all -- --check - name: Check clippy @@ -51,7 +51,7 @@ jobs: - name: Install Foundry uses: foundry-rs/foundry-toolchain@v1 with: - version: 'v1.0.0' + version: 'v1.0.0' - name: Install kurtosis run: | sudo apt-get install jq diff --git a/.gitignore b/.gitignore index e90dc53..4059f82 100644 --- a/.gitignore +++ b/.gitignore @@ -25,4 +25,6 @@ target/ /.data *.bin *.json -chain \ No newline at end of file +chain +.preimage +.finalized_l1 diff --git a/Cargo.lock b/Cargo.lock index 5b2cc9b..e0332b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4034,7 +4034,7 @@ dependencies = [ [[package]] name = "optimism-preimage-maker" -version = "0.1.3" +version = "0.2.0" dependencies = [ "alloy-primitives", "anyhow", @@ -5242,9 +5242,9 @@ checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" [[package]] name = "scc" -version = "2.3.4" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22b2d775fb28f245817589471dd49c5edf64237f4a19d10ce9a92ff4651a27f4" +checksum = "46e6f046b7fef48e2660c57ed794263155d713de679057f2d0c169bfc6e756cc" dependencies = [ "sdd", ] @@ -5290,9 +5290,9 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "sdd" -version = "3.0.8" +version = "3.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "584e070911c7017da6cb2eb0788d09f43d789029b5877d3e5ecc8acf86ceee21" +checksum = "490dcfcbfef26be6800d11870ff2df8774fa6e86d047e3e8c8a76b25655e41ca" [[package]] name = "sec1" diff --git a/Makefile b/Makefile index 9c6be87..e82524d 100644 --- a/Makefile +++ b/Makefile @@ -37,6 +37,8 @@ wait: .PHONY: server-up server-up: + mkdir -p .preimage && true + mkdir -p .finalized_l1 && true @L2_ROLLUP_PORT=$$(jq -r '.l2RollupPort' hostPort.json);\ L2_GETH_PORT=$$(jq -r '.l2GethPort' hostPort.json);\ L1_GETH_PORT=$$(jq -r '.l1GethPort' hostPort.json);\ @@ -47,19 +49,21 @@ server-up: --l2=http://localhost:$$L2_GETH_PORT \ --l1=http://localhost:$$L1_GETH_PORT \ --beacon=http://localhost:$$L1_BEACON_PORT \ - --l1-chain-config=$$L1_CHAIN_CONFIG + --l1-chain-config=$$L1_CHAIN_CONFIG \ + --initial-claimed-l2=103 \ + --ttl=1800 \ + --max-preimage-distance=100 \ + --purger-interval-seconds=100 .PHONY: test test: @L2_ROLLUP_PORT=$$(jq -r '.l2RollupPort' hostPort.json);\ L2_GETH_PORT=$$(jq -r '.l2GethPort' hostPort.json);\ - L2_ROLLUP_PORT=$$L2_ROLLUP_PORT L2_GETH_PORT=$$L2_GETH_PORT cargo test --manifest-path=./server/Cargo.toml + L2_ROLLUP_ADDR=http://localhost:$$L2_ROLLUP_PORT L2_GETH_ADDR=http://localhost:$$L2_GETH_PORT cargo test --manifest-path=./server/Cargo.toml -.PHONY: test-ignored -test-ignored: - @L2_ROLLUP_PORT=$$(jq -r '.l2RollupPort' hostPort.json);\ - L2_GETH_PORT=$$(jq -r '.l2GethPort' hostPort.json);\ - REQUEST_PATH=$(CURDIR)/tool/body.json L2_ROLLUP_PORT=$$L2_ROLLUP_PORT L2_GETH_PORT=$$L2_GETH_PORT cargo test --manifest-path=./server/Cargo.toml -- --ignored +.PHONY: inspect +inspect: + cargo test --manifest-path=./server/Cargo.toml -- --ignored .PHONY: devnet-down devnet-down: diff --git a/server/Cargo.toml b/server/Cargo.toml index bf6432f..8aa2efe 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "optimism-preimage-maker" -version = "0.1.3" +version = "0.2.0" edition = "2021" [dependencies] @@ -15,6 +15,7 @@ tracing = { version = "0.1", default-features = false } tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] } axum = "0.7.9" tokio = { version="1.41", features=["full"] } +base64 = "0.22.1" # Kona kona-client = { workspace = true } @@ -29,7 +30,6 @@ alloy-primitives = { workspace = true, features = ["map", "serde"] } # optimism derivation optimism-derivation = { git = "https://github.com/datachainlab/optimism-elc", rev = "v0.1.10", default-features = false } -base64 = "0.22.1" [dev-dependencies] prost = "0.11.9" diff --git a/server/src/client/beacon_client.rs b/server/src/client/beacon_client.rs new file mode 100644 index 0000000..e0e046b --- /dev/null +++ b/server/src/client/beacon_client.rs @@ -0,0 +1,126 @@ +use alloy_primitives::B256; +use axum::async_trait; +use reqwest::Response; + +#[derive(Debug, Clone, serde::Deserialize)] +pub struct LightClientFinalityUpdateResponse { + pub data: LightClientFinalityUpdate, +} + +#[derive(Debug, Clone, serde::Deserialize)] +pub struct LightClientFinalityUpdate { + pub finalized_header: LightClientHeader, +} + +#[derive(Debug, Clone, serde::Deserialize)] +pub struct LightClientHeader { + pub execution: ExecutionPayloadHeader, +} + +#[derive(Debug, Clone, serde::Deserialize)] +pub struct ExecutionPayloadHeader { + pub block_hash: B256, + #[serde(deserialize_with = "deserialize_u64_from_str")] + pub block_number: u64, +} + +// serde helper to allow numbers encoded as strings +fn deserialize_u64_from_str<'de, D>(deserializer: D) -> Result +where + D: serde::Deserializer<'de>, +{ + struct U64StringOrNumber; + + impl<'de> serde::de::Visitor<'de> for U64StringOrNumber { + type Value = u64; + + fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "a u64 as number or string") + } + + fn visit_u64(self, v: u64) -> Result { + Ok(v) + } + + fn visit_i64(self, v: i64) -> Result + where + E: serde::de::Error, + { + if v < 0 { + return Err(E::custom("negative value for u64")); + } + Ok(v as u64) + } + + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + v.parse::() + .map_err(|e| E::custom(format!("invalid u64 string: {e}"))) + } + + fn visit_string(self, v: String) -> Result + where + E: serde::de::Error, + { + self.visit_str(&v) + } + } + + deserializer.deserialize_any(U64StringOrNumber) +} + +#[async_trait] +pub trait BeaconClient: Send + Sync + 'static { + async fn get_raw_light_client_finality_update(&self) -> anyhow::Result; +} + +#[derive(Debug, Clone)] +pub struct HttpBeaconClient { + beacon_addr: String, + client: reqwest::Client, +} + +impl HttpBeaconClient { + pub fn new(beacon_addr: String, timeout: std::time::Duration) -> Self { + Self { + beacon_addr, + client: reqwest::Client::builder() + .timeout(timeout) + .build() + .expect("failed to build reqwest client"), + } + } + + async fn check_response(&self, response: Response) -> anyhow::Result { + if response.status().is_success() { + Ok(response) + } else { + Err(anyhow::anyhow!( + "Request failed with status: {} body={:?}", + response.status(), + response.text().await + )) + } + } +} + +#[async_trait] +impl BeaconClient for HttpBeaconClient { + async fn get_raw_light_client_finality_update(&self) -> anyhow::Result { + let response = self + .client + .get(format!( + "{}/eth/v1/beacon/light_client/finality_update", + self.beacon_addr + )) + .send() + .await?; + let response = self.check_response(response).await?; + response + .text() + .await + .map_err(|e| anyhow::anyhow!("Failed to get finality update: {e:?}")) + } +} diff --git a/server/src/l2_client.rs b/server/src/client/l2_client.rs similarity index 76% rename from server/src/l2_client.rs rename to server/src/client/l2_client.rs index 4107acf..42910e6 100644 --- a/server/src/l2_client.rs +++ b/server/src/client/l2_client.rs @@ -1,5 +1,6 @@ use alloy_primitives::B256; use anyhow::Result; +use axum::async_trait; use kona_genesis::RollupConfig; use reqwest::Response; use serde_json::Value; @@ -12,11 +13,11 @@ pub struct RpcResult { } #[derive(Debug, Clone, serde::Serialize)] -struct RpcRequest { - jsonrpc: String, - method: String, - params: Vec, - id: i64, +pub struct RpcRequest { + pub jsonrpc: String, + pub method: String, + pub params: Vec, + pub id: i64, } impl Default for RpcRequest { @@ -91,35 +92,66 @@ pub struct Block { pub hash: B256, } -pub struct L2Client { +#[derive(Debug, Clone)] +pub struct HttpL2Client { op_node_addr: String, op_geth_addr: String, + client: reqwest::Client, +} + +#[async_trait] +pub trait L2Client: Send + Sync + 'static { + async fn chain_id(&self) -> Result; + async fn rollup_config(&self) -> Result; + async fn sync_status(&self) -> Result; + async fn output_root_at(&self, number: u64) -> Result; + async fn get_block_by_number(&self, number: u64) -> Result; } -impl Default for L2Client { +impl Default for HttpL2Client { fn default() -> Self { Self::new( "http://localhost:7545".into(), "http://localhost:9545".into(), + std::time::Duration::from_secs(30), ) } } -impl L2Client { - pub fn new(op_node_addr: String, op_geth_addr: String) -> Self { +impl HttpL2Client { + pub fn new(op_node_addr: String, op_geth_addr: String, timeout: std::time::Duration) -> Self { Self { op_node_addr, op_geth_addr, + client: reqwest::Client::builder() + .timeout(timeout) + .build() + .expect("failed to build reqwest client"), } } - pub async fn chain_id(&self) -> Result { - let client = reqwest::Client::new(); + async fn check_response(&self, response: Response) -> Result { + if response.status().is_success() { + Ok(response) + } else { + Err(anyhow::anyhow!( + "Request failed with status: {} body={:?}", + response.status(), + response.text().await + )) + } + } +} + +#[async_trait] +impl L2Client for HttpL2Client { + async fn chain_id(&self) -> Result { let body = RpcRequest { method: "eth_chainId".into(), ..Default::default() }; - let response = client + let response = self + .client .post(&self.op_geth_addr) .header("Content-Type", "application/json") .json(&body) @@ -130,13 +162,13 @@ impl L2Client { Ok(u64::from_str_radix(&result.result[2..], 16)?) } - pub async fn rollup_config(&self) -> Result { - let client = reqwest::Client::new(); + async fn rollup_config(&self) -> Result { let body = RpcRequest { method: "optimism_rollupConfig".into(), ..Default::default() }; - let response = client + let response = self + .client .post(&self.op_node_addr) .header("Content-Type", "application/json") .json(&body) @@ -146,13 +178,13 @@ impl L2Client { let result: RpcResult = response.json().await?; Ok(result.result) } - pub async fn sync_status(&self) -> Result { - let client = reqwest::Client::new(); + async fn sync_status(&self) -> Result { let body = RpcRequest { method: "optimism_syncStatus".into(), ..Default::default() }; - let response = client + let response = self + .client .post(&self.op_node_addr) .header("Content-Type", "application/json") .json(&body) @@ -163,14 +195,14 @@ impl L2Client { Ok(result.result) } - pub async fn output_root_at(&self, number: u64) -> Result { - let client = reqwest::Client::new(); + async fn output_root_at(&self, number: u64) -> Result { let body = RpcRequest { method: "optimism_outputAtBlock".into(), params: vec![format!("0x{number:X}").into()], ..Default::default() }; - let response = client + let response = self + .client .post(&self.op_node_addr) .header("Content-Type", "application/json") .json(&body) @@ -181,14 +213,14 @@ impl L2Client { Ok(result.result) } - pub async fn get_block_by_number(&self, number: u64) -> Result { - let client = reqwest::Client::new(); + async fn get_block_by_number(&self, number: u64) -> Result { let body = RpcRequest { method: "eth_getBlockByNumber".into(), params: vec![format!("0x{number:X}").into(), false.into()], ..Default::default() }; - let response = client + let response = self + .client .post(&self.op_geth_addr) .header("Content-Type", "application/json") .json(&body) @@ -198,16 +230,4 @@ impl L2Client { let result: RpcResult = response.json().await?; Ok(result.result) } - - async fn check_response(&self, response: Response) -> Result { - if response.status().is_success() { - Ok(response) - } else { - Err(anyhow::anyhow!( - "Request failed with status: {} body={:?}", - response.status(), - response.text().await - )) - } - } } diff --git a/server/src/client/mod.rs b/server/src/client/mod.rs new file mode 100644 index 0000000..22a3d63 --- /dev/null +++ b/server/src/client/mod.rs @@ -0,0 +1,2 @@ +pub mod beacon_client; +pub mod l2_client; diff --git a/server/src/collector.rs b/server/src/collector.rs new file mode 100644 index 0000000..a8a136c --- /dev/null +++ b/server/src/collector.rs @@ -0,0 +1,951 @@ +use crate::client::beacon_client::{BeaconClient, LightClientFinalityUpdateResponse}; +use crate::client::l2_client::{L2Client, SyncStatus}; +use crate::data::finalized_l1_repository::FinalizedL1Repository; +use crate::data::preimage_repository::{PreimageMetadata, PreimageRepository}; +use crate::derivation::host::single::handler::{Derivation, DerivationConfig, DerivationRequest}; +use alloy_primitives::B256; +use axum::async_trait; +use std::sync::Arc; +use tokio::time; +use tracing::{error, info, warn}; + +#[async_trait] +pub trait DerivationDriver: Send + Sync + 'static { + async fn drive( + &self, + config: Arc, + request: DerivationRequest, + ) -> anyhow::Result>; +} + +pub struct RealDerivationDriver; + +#[async_trait] +impl DerivationDriver for RealDerivationDriver { + async fn drive( + &self, + config: Arc, + request: DerivationRequest, + ) -> anyhow::Result> { + let derivation = Derivation { config, request }; + derivation.start().await + } +} + +pub struct PreimageCollector +where + T: PreimageRepository, + F: FinalizedL1Repository, + L: L2Client, + B: BeaconClient, + D: DerivationDriver, +{ + pub client: Arc, + pub beacon_client: Arc, + pub derivation_driver: Arc, + pub config: Arc, + pub preimage_repository: Arc, + pub finalized_l1_repository: Arc, + pub max_distance: u64, + pub max_concurrency: usize, + pub initial_claimed: u64, + pub interval_seconds: u64, +} + +impl PreimageCollector +where + T: PreimageRepository, + F: FinalizedL1Repository, + L: L2Client, + B: BeaconClient, + D: DerivationDriver, +{ + /// Starts the asynchronous process to continually check and collect claimed metadata. + /// + /// This function retrieves the latest claimed metadata from the `preimage_repository`. + /// It then enters an infinite loop where it performs the following tasks: + /// 1. Collects metadata using the `collect` method with the `latest_l2` value. + /// 2. Updates the `latest_l2` value if new metadata is claimed. + /// 3. Waits for a specified interval (defined by `self.interval_seconds`) before repeating the process. + /// + /// The loop runs indefinitely. To stop it, external cancellation or shutdown logic should be + /// implemented, such as utilizing `tokio::task::JoinHandle` or a cancellation token. + pub async fn start(&self) { + let mut latest_l2: u64 = match self.preimage_repository.latest_metadata().await { + Some(metadata) => metadata.claimed, + None => self.initial_claimed, + }; + loop { + if let Some(claimed) = self.collect(latest_l2).await { + latest_l2 = claimed; + } + tokio::time::sleep(time::Duration::from_secs(self.interval_seconds)).await; + } + } + + /// Collects preimage data based on synchronization status from the L2 chain and persists important finality + /// information such as the latest finalized L1 head hash. + /// + /// 1. **Check Synchronization Status**: + /// - Fetches synchronization status from the underlying L2 client. + /// - If the L2’s finalized block number (`sync_status.finalized_l2.number`) is less than or equal to + /// the provided `latest_l2`, no further processing is performed and `None` is returned. + /// + /// 2. **Retrieve L1 Head Hash**: + /// - Obtains the hash of the latest L1 head and its raw finality data for L2 derivation purposes. + /// - Returns `None` if this operation fails. + /// + /// 3. **Batch Collection Preparation**: + /// - Prepares a batch of block ranges within the defined constraints (`max_concurrency` and `max_distance`) + /// for processing, starting from the provided `latest_l2` and up to the finalized L2 block in the + /// synchronization status. + /// + /// 4. **Store Finalized L1 Hash**: + /// - Attempts to save the finalized L1 head hash and its associated finality data to the repository. + /// Logs errors if the operation fails but continues processing. + /// + /// 5. **Parallel Collection**: + /// - Executes collection tasks for the prepared block batch in parallel using the computed L1 head hash. + /// - Logs and returns `None` on failure, or the latest L2 block number if successful. + /// + async fn collect(&self, latest_l2: u64) -> Option { + // Check sync status + let sync_status = self.client.sync_status().await; + let sync_status = match sync_status { + Ok(sync_status) => { + info!( + "sync status: claimed_l2={}, next_claiming_l2={}, sync_finalized_l1={}", + latest_l2, sync_status.finalized_l2.number, sync_status.finalized_l1.number + ); + if sync_status.finalized_l2.number <= latest_l2 { + return None; + } + sync_status + } + Err(e) => { + error!("Failed to get sync status {:?}", e); + return None; + } + }; + + // Get latest l1 head hash for L2 derivation + let (l1_head_hash, raw_finality_l1) = self.get_l1_head_hash(&sync_status).await?; + + // Collect preimage from latest_l2 to finalized_l2 + let mut batch = Vec::new(); + let mut current = latest_l2; + for _ in 0..self.max_concurrency { + if current >= sync_status.finalized_l2.number { + break; + } + let end = std::cmp::min(current + self.max_distance, sync_status.finalized_l2.number); + batch.push((current, end)); + current = end; + } + + info!("derivation batch={:?}", batch); + + // Save finalized_l1 + if let Err(e) = self + .finalized_l1_repository + .upsert(&l1_head_hash, raw_finality_l1) + .await + { + error!( + "Failed to save finalized l1 head hash to db l1_head={}, {:?}", + l1_head_hash, e + ); + } + + if batch.is_empty() { + return None; + } + + self.parallel_collect(l1_head_hash, batch) + .await + .unwrap_or_else(|e| { + error!("Failed to collect preimages {:?}", e); + None + }) + } + + /// Asynchronously retrieves the latest finalized L1 block hash for use in derivation. + /// + /// This function communicates with the beacon client to fetch the latest finalized L1 block data. + /// It ensures the returned block is up-to-date relative to the specified [`SyncStatus`]. + /// + /// - The function fetches the raw `finality` update from the beacon client. + /// - It validates that the block is not outdated compared to the `sync_status`. + /// - In case the retrieved block number is outdated, it waits for a small delay (10 seconds) and retries. + /// - Logs error or warning messages when issues occur, including: + /// - Failure to fetch or deserialize the finality update. + /// - Delayed finality L1 blocks. + /// - If a valid and up-to-date finalized block is found, it returns the L1 block's hash and raw response. + /// + async fn get_l1_head_hash(&self, sync_status: &SyncStatus) -> Option<(B256, String)> { + let mut attempts_count = 0; + let (finality_l1, raw_finality_l1) = loop { + let raw_finality_l1 = match self + .beacon_client + .get_raw_light_client_finality_update() + .await + { + Ok(finality_l1) => finality_l1, + Err(e) => { + error!("Failed to get finality update from beacon client {:?}", e); + return None; + } + }; + let finality_l1: LightClientFinalityUpdateResponse = + match serde_json::from_str(&raw_finality_l1) { + Ok(value) => value, + Err(e) => { + error!("Failed to get finality update from beacon client {:?}", e); + return None; + } + }; + let block_number = finality_l1.data.finalized_header.execution.block_number; + if block_number < sync_status.finalized_l1.number { + if attempts_count > 30 { + error!( + "finality_l1 = {:?} delayed. attempts_count = {}", + block_number, attempts_count + ); + // It is intentional that the process doesn't exit with an error even after exceeding attempts_count, + // as we would have no choice but to continue the loop anyway. + // The purpose of attempts_count is to trigger error-level logs for monitoring and detection, + // rather than to terminate the process. + } else { + warn!( + "finality_l1 = {:?} delayed. attempts_count = {}", + block_number, attempts_count + ); + } + attempts_count += 1; + time::sleep(time::Duration::from_secs(10)).await; + continue; + } + break (finality_l1, raw_finality_l1); + }; + let l1_head_hash = finality_l1.data.finalized_header.execution.block_hash; + info!( + "l1_head for derivation = {:?}", + finality_l1.data.finalized_header.execution + ); + Some((l1_head_hash, raw_finality_l1)) + } + + /// Performs parallel collection of data within a specified range, processes it using multiple + /// asynchronous tasks, and commits the resulting batch. + /// + /// 1. Creates multiple asynchronous tasks to process the range of work defined in the `batch`. + /// - Each range element (`start`, `end`) is processed by the `collect` function. + /// - The `collect` function performs the desired collection logic with `l2_client` and `config`. + /// 2. The tasks are executed in parallel using `tokio::spawn`. + /// 3. Collects the results asynchronously, returning an error if any task fails. + /// 4. Once all tasks are complete, invokes the `commit_batch` method to commit the collected + /// results. + /// + /// ``` + async fn parallel_collect( + &self, + l1_head_hash: B256, + batch: Vec<(u64, u64)>, + ) -> anyhow::Result> { + let mut tasks = vec![]; + + // Spawn tasks to collect preimages + for (start, end) in batch { + let l2_client = self.client.clone(); + let config = self.config.clone(); + let derivation_driver = self.derivation_driver.clone(); + tasks.push(tokio::spawn(async move { + collect( + l2_client, + config, + derivation_driver, + l1_head_hash, + start, + end, + ) + .await + })); + } + + // Wait for all tasks to finish + let mut results = vec![]; + for task in tasks { + results.push(task.await??); + } + + // Commit + Ok(self.commit_batch(results).await) + } + + /// Commits a batch of preimages to the database and returns the latest claimed block number. + /// + /// This method performs the following steps: + /// - Sorts the input preimages (`successes`) by their claimed block number in ascending order to + /// ensure deterministic ordering of preimages. + /// - Iterates through the sorted preimages and commits each preimage into the database by + /// invoking the `upsert` method of the `preimage_repository`. + /// - Tracks and updates the latest claimed block number during the iteration. + /// + async fn commit_batch(&self, mut successes: Vec<(PreimageMetadata, Vec)>) -> Option { + // Sort by claimed block number to ensure deterministic order of preimages + successes.sort_by(|a, b| a.0.claimed.cmp(&b.0.claimed)); + + let mut latest_l2 = None; + for (metadata, preimage) in successes { + // Commit preimages to db + let claimed = metadata.claimed; + if let Err(e) = self.preimage_repository.upsert(metadata, preimage).await { + error!("Failed to upsert preimage: {:?}", e); + return latest_l2; + } + latest_l2 = Some(claimed); + } + latest_l2 + } +} + +/// Collect preimage for range [start, end] +async fn collect( + client: Arc, + config: Arc, + derivation_driver: Arc, + l1_head_hash: B256, + start: u64, + end: u64, +) -> anyhow::Result<(PreimageMetadata, Vec)> { + let agreed_block = client.output_root_at(start).await?; + let claiming_block = client.output_root_at(end).await?; + + let request = DerivationRequest { + l1_head_hash, + agreed_l2_head_hash: agreed_block.block_ref.hash, + agreed_l2_output_root: agreed_block.output_root, + l2_output_root: claiming_block.output_root, + l2_block_number: end, + }; + + info!("derivation start : {:?}", &request); + let metadata = PreimageMetadata { + agreed: start, + claimed: end, + l1_head: l1_head_hash, + }; + let result = derivation_driver.drive(config, request).await; + match result { + Ok(preimage) => { + info!("derivation success : {metadata:?}"); + Ok((metadata, preimage)) + } + Err(e) => { + error!("derivation failed : {metadata:?}, error={}", e); + Err(e) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::client::beacon_client::BeaconClient; + use crate::client::l2_client::{Block, L2Client, OutputRootAtBlock, SyncStatus}; + use crate::derivation::host::single::config::Config; + use crate::derivation::host::single::handler::DerivationConfig; + use alloy_primitives::B256; + use anyhow::anyhow; + use axum::async_trait; + use clap::Parser; + use kona_genesis::RollupConfig; + use std::sync::Mutex; + + // Mocks + struct MockL2Client { + sync_status: Option, + output_roots: std::collections::HashMap, + } + + #[async_trait] + impl L2Client for MockL2Client { + async fn chain_id(&self) -> anyhow::Result { + Ok(10) + } + async fn rollup_config(&self) -> anyhow::Result { + Err(anyhow!("unimplemented")) + } + async fn sync_status(&self) -> anyhow::Result { + self.sync_status.clone().ok_or(anyhow!("sync status error")) + } + async fn output_root_at(&self, number: u64) -> anyhow::Result { + self.output_roots + .get(&number) + .cloned() + .ok_or(anyhow!("no output root")) + } + async fn get_block_by_number(&self, _number: u64) -> anyhow::Result { + Err(anyhow!("unimplemented")) + } + } + + struct MockBeaconClient { + finality_update: Option, + } + + #[async_trait] + impl BeaconClient for MockBeaconClient { + async fn get_raw_light_client_finality_update(&self) -> anyhow::Result { + self.finality_update.clone().ok_or(anyhow!("error")) + } + } + + struct MockDerivationDriver { + calls: Arc>>, + } + + #[async_trait] + impl DerivationDriver for MockDerivationDriver { + async fn drive( + &self, + _config: Arc, + request: DerivationRequest, + ) -> anyhow::Result> { + self.calls.lock().unwrap().push(request); + Ok(vec![0x1, 0x2, 0x3]) + } + } + + struct MockPreimageRepository { + upserted: Arc>>, + } + + #[async_trait] + impl PreimageRepository for MockPreimageRepository { + async fn upsert( + &self, + metadata: PreimageMetadata, + _preimage: Vec, + ) -> anyhow::Result<()> { + self.upserted.lock().unwrap().push(metadata); + Ok(()) + } + async fn get(&self, _metadata: &PreimageMetadata) -> anyhow::Result> { + Ok(vec![]) + } + async fn list_metadata(&self, _lt: Option, _gt: Option) -> Vec { + vec![] + } + async fn latest_metadata(&self) -> Option { + None + } + async fn purge_expired(&self) -> anyhow::Result<()> { + Ok(()) + } + } + + struct MockFinalizedL1Repository { + upserted: Arc>>, + } + + #[async_trait] + impl FinalizedL1Repository for MockFinalizedL1Repository { + async fn upsert( + &self, + l1_head_hash: &B256, + _raw_finalized_l1: String, + ) -> anyhow::Result<()> { + self.upserted.lock().unwrap().push(*l1_head_hash); + Ok(()) + } + async fn get(&self, _l1_head_hash: &B256) -> anyhow::Result { + Ok("".into()) + } + async fn purge_expired(&self) -> anyhow::Result<()> { + Ok(()) + } + } + + #[tokio::test] + async fn test_collector_collect() { + let l1_head = B256::repeat_byte(0x11); + let sync_status = SyncStatus { + current_l1: dummy_l1(100), + current_l1_finalized: dummy_l1(90), + head_l1: dummy_l1(100), + safe_l1: dummy_l1(95), + finalized_l1: dummy_l1(90), + unsafe_l2: dummy_l2(200), + safe_l2: dummy_l2(190), + finalized_l2: dummy_l2(150), + pending_safe_l2: dummy_l2(190), + }; + + // Prepare L2Client mock + let mut output_roots = std::collections::HashMap::new(); + output_roots.insert(100, dummy_output_root(100)); // agreed + output_roots.insert(110, dummy_output_root(110)); // target + output_roots.insert(120, dummy_output_root(120)); + output_roots.insert(130, dummy_output_root(130)); + output_roots.insert(140, dummy_output_root(140)); + output_roots.insert(150, dummy_output_root(150)); + + let l2_client = Arc::new(MockL2Client { + sync_status: Some(sync_status), + output_roots, + }); + + // Beacon client mock + let update_json = serde_json::json!({ + "data": { + "finalized_header": { + "execution": { + "block_hash": l1_head, + "block_number": "95" + } + } + } + }) + .to_string(); + let beacon_client = Arc::new(MockBeaconClient { + finality_update: Some(update_json), + }); + + let conf = Config::parse_from([ + "exe", + "--l1-beacon-address", + "http://localhost:5052", + "--l2-node-address", + "http://localhost:8545", + "--l2-rollup-address", + "http://localhost:8545", + "--preimage-dir", + "/tmp", + "--finalized-l1-dir", + "/tmp", + "--initial-claimed-l2", + "0", + ]); // Partial config ok + let derivation_config = Arc::new(DerivationConfig { + config: conf, + rollup_config: None, + l2_chain_id: 10, + l1_chain_config: None, + }); + + let mock_derivations = Arc::new(Mutex::new(vec![])); + let derivation_driver = Arc::new(MockDerivationDriver { + calls: mock_derivations.clone(), + }); + + let mock_preimage_repo = Arc::new(MockPreimageRepository { + upserted: Arc::new(Mutex::new(vec![])), + }); + let mock_finalized_repo = Arc::new(MockFinalizedL1Repository { + upserted: Arc::new(Mutex::new(vec![])), + }); + + let collector = PreimageCollector { + client: l2_client, + beacon_client, + derivation_driver, + config: derivation_config, + preimage_repository: mock_preimage_repo.clone(), + finalized_l1_repository: mock_finalized_repo.clone(), + max_distance: 10, + max_concurrency: 2, + initial_claimed: 0, + interval_seconds: 1, + }; + + // Start from 100 + let new_head = collector.collect(100).await; + + assert_eq!(new_head, Some(120)); // Should reach limit 120 (100 + 2*10) + + // Verify derivation calls + let calls = mock_derivations.lock().unwrap(); + // 100->110, 110->120 (2 chunks of size 10) + assert_eq!(calls.len(), 2); + + // Verify finalized L1 saved + assert_eq!(mock_finalized_repo.upserted.lock().unwrap().len(), 1); + assert_eq!(mock_finalized_repo.upserted.lock().unwrap()[0], l1_head); + + // Verify preimages saved + assert_eq!(mock_preimage_repo.upserted.lock().unwrap().len(), 2); + } + + fn dummy_l1(number: u64) -> crate::client::l2_client::L1Header { + crate::client::l2_client::L1Header { + hash: B256::ZERO, + number, + parent_hash: B256::ZERO, + timestamp: 0, + } + } + + fn dummy_l2(number: u64) -> crate::client::l2_client::L2Header { + crate::client::l2_client::L2Header { + hash: B256::ZERO, + number, + parent_hash: B256::ZERO, + timestamp: 0, + l1origin: crate::client::l2_client::L1Origin { + hash: B256::ZERO, + number: 0, + }, + sequence_number: 0, + } + } + + fn dummy_output_root(number: u64) -> OutputRootAtBlock { + OutputRootAtBlock { + output_root: B256::ZERO, + block_ref: crate::client::l2_client::L2BlockRef { + hash: B256::ZERO, + number, + l1_origin: crate::client::l2_client::L1Origin { + hash: B256::ZERO, + number: 0, + }, + }, + } + } + #[tokio::test] + async fn test_collector_sync_status_reached() { + let sync_status = SyncStatus { + current_l1: dummy_l1(100), + current_l1_finalized: dummy_l1(90), + head_l1: dummy_l1(100), + safe_l1: dummy_l1(95), + finalized_l1: dummy_l1(90), + unsafe_l2: dummy_l2(200), + safe_l2: dummy_l2(190), + finalized_l2: dummy_l2(150), + pending_safe_l2: dummy_l2(190), + }; + + let l2_client = Arc::new(MockL2Client { + sync_status: Some(sync_status), + output_roots: std::collections::HashMap::new(), + }); + + // Beacon client mock (not used if sync status check fails early) + let beacon_client = Arc::new(MockBeaconClient { + finality_update: None, + }); + + let conf = Config::parse_from(["exe", "--initial-claimed-l2", "0"]); // Minimal valid config + let derivation_config = Arc::new(DerivationConfig { + config: conf, + rollup_config: None, + l2_chain_id: 10, + l1_chain_config: None, + }); + + let derivation_driver = Arc::new(MockDerivationDriver { + calls: Arc::new(Mutex::new(vec![])), + }); + + let mock_preimage_repo = Arc::new(MockPreimageRepository { + upserted: Arc::new(Mutex::new(vec![])), + }); + let mock_finalized_repo = Arc::new(MockFinalizedL1Repository { + upserted: Arc::new(Mutex::new(vec![])), + }); + + let collector = PreimageCollector { + client: l2_client, + beacon_client, + derivation_driver, + config: derivation_config, + preimage_repository: mock_preimage_repo, + finalized_l1_repository: mock_finalized_repo, + max_distance: 10, + max_concurrency: 2, + initial_claimed: 0, + interval_seconds: 1, + }; + + // latest_l2 (150) == finalized_l2 (150) + let result = collector.collect(150).await; + assert_eq!(result, None); + + // latest_l2 (160) > finalized_l2 (150) + let result = collector.collect(160).await; + assert_eq!(result, None); + } + + #[tokio::test] + async fn test_collector_beacon_client_error() { + let sync_status = SyncStatus { + current_l1: dummy_l1(100), + current_l1_finalized: dummy_l1(90), + head_l1: dummy_l1(100), + safe_l1: dummy_l1(95), + finalized_l1: dummy_l1(90), + unsafe_l2: dummy_l2(200), + safe_l2: dummy_l2(190), + finalized_l2: dummy_l2(150), + pending_safe_l2: dummy_l2(190), + }; + + let l2_client = Arc::new(MockL2Client { + sync_status: Some(sync_status), + output_roots: std::collections::HashMap::new(), + }); + + // Beacon client mock returning error (None) + let beacon_client = Arc::new(MockBeaconClient { + finality_update: None, + }); + + let conf = Config::parse_from(["exe", "--initial-claimed-l2", "0"]); + let derivation_config = Arc::new(DerivationConfig { + config: conf, + rollup_config: None, + l2_chain_id: 10, + l1_chain_config: None, + }); + + let derivation_driver = Arc::new(MockDerivationDriver { + calls: Arc::new(Mutex::new(vec![])), + }); + + let mock_preimage_repo = Arc::new(MockPreimageRepository { + upserted: Arc::new(Mutex::new(vec![])), + }); + let mock_finalized_repo = Arc::new(MockFinalizedL1Repository { + upserted: Arc::new(Mutex::new(vec![])), + }); + + let collector = PreimageCollector { + client: l2_client, + beacon_client, + derivation_driver, + config: derivation_config, + preimage_repository: mock_preimage_repo, + finalized_l1_repository: mock_finalized_repo, + max_distance: 10, + max_concurrency: 2, + initial_claimed: 0, + interval_seconds: 1, + }; + + // Should return None due to beacon client error + let result = collector.collect(100).await; + assert_eq!(result, None); + } + + struct MockDerivationDriverError; + #[async_trait] + impl DerivationDriver for MockDerivationDriverError { + async fn drive( + &self, + _config: Arc, + _request: DerivationRequest, + ) -> anyhow::Result> { + Err(anyhow!("derivation error")) + } + } + + #[tokio::test] + async fn test_collector_derivation_failure() { + let l1_head = B256::repeat_byte(0x11); + let sync_status = SyncStatus { + current_l1: dummy_l1(100), + current_l1_finalized: dummy_l1(90), + head_l1: dummy_l1(100), + safe_l1: dummy_l1(95), + finalized_l1: dummy_l1(90), + unsafe_l2: dummy_l2(200), + safe_l2: dummy_l2(190), + finalized_l2: dummy_l2(150), + pending_safe_l2: dummy_l2(190), + }; + + let mut output_roots = std::collections::HashMap::new(); + output_roots.insert(100, dummy_output_root(100)); + output_roots.insert(110, dummy_output_root(110)); + + let l2_client = Arc::new(MockL2Client { + sync_status: Some(sync_status), + output_roots, + }); + + let update_json = serde_json::json!({ + "data": { + "finalized_header": { + "execution": { + "block_hash": l1_head, + "block_number": "95" + } + } + } + }) + .to_string(); + let beacon_client = Arc::new(MockBeaconClient { + finality_update: Some(update_json), + }); + + let conf = Config::parse_from(["exe", "--initial-claimed-l2", "0"]); + let derivation_config = Arc::new(DerivationConfig { + config: conf, + rollup_config: None, + l2_chain_id: 10, + l1_chain_config: None, + }); + + let derivation_driver = Arc::new(MockDerivationDriverError); // Fails derivation + + let mock_preimage_repo = Arc::new(MockPreimageRepository { + upserted: Arc::new(Mutex::new(vec![])), + }); + let mock_finalized_repo = Arc::new(MockFinalizedL1Repository { + upserted: Arc::new(Mutex::new(vec![])), + }); + + let collector = PreimageCollector { + client: l2_client, + beacon_client, + derivation_driver, + config: derivation_config, + preimage_repository: mock_preimage_repo.clone(), + finalized_l1_repository: mock_finalized_repo, + max_distance: 10, + max_concurrency: 2, + initial_claimed: 0, + interval_seconds: 1, + }; + + // Should return None because derivation failed (triggering retry) + let result = collector.collect(100).await; + assert_eq!(result, None); + + // Verify preimages were NOT saved + assert_eq!(mock_preimage_repo.upserted.lock().unwrap().len(), 0); + } + + struct MockPreimageRepositoryPartialError { + upserted: Arc>>, + fail_index: usize, + } + + #[async_trait] + impl PreimageRepository for MockPreimageRepositoryPartialError { + async fn upsert( + &self, + metadata: PreimageMetadata, + _preimage: Vec, + ) -> anyhow::Result<()> { + let mut upserted = self.upserted.lock().unwrap(); + if upserted.len() == self.fail_index { + return Err(anyhow!("partial error")); + } + upserted.push(metadata); + Ok(()) + } + async fn get(&self, _metadata: &PreimageMetadata) -> anyhow::Result> { + Ok(vec![]) + } + async fn list_metadata(&self, _lt: Option, _gt: Option) -> Vec { + vec![] + } + async fn latest_metadata(&self) -> Option { + None + } + async fn purge_expired(&self) -> anyhow::Result<()> { + Ok(()) + } + } + + #[tokio::test] + async fn test_collector_commit_batch_partial_failure() { + let l1_head = B256::repeat_byte(0x11); + let sync_status = SyncStatus { + current_l1: dummy_l1(100), + current_l1_finalized: dummy_l1(90), + head_l1: dummy_l1(100), + safe_l1: dummy_l1(95), + finalized_l1: dummy_l1(90), + unsafe_l2: dummy_l2(200), + safe_l2: dummy_l2(190), + finalized_l2: dummy_l2(150), + pending_safe_l2: dummy_l2(190), + }; + + let mut output_roots = std::collections::HashMap::new(); + output_roots.insert(100, dummy_output_root(100)); + output_roots.insert(110, dummy_output_root(110)); + output_roots.insert(120, dummy_output_root(120)); + + let l2_client = Arc::new(MockL2Client { + sync_status: Some(sync_status), + output_roots, + }); + + let update_json = serde_json::json!({ + "data": { + "finalized_header": { + "execution": { + "block_hash": l1_head, + "block_number": "95" + } + } + } + }) + .to_string(); + let beacon_client = Arc::new(MockBeaconClient { + finality_update: Some(update_json), + }); + + let conf = Config::parse_from(["exe", "--initial-claimed-l2", "0"]); + let derivation_config = Arc::new(DerivationConfig { + config: conf, + rollup_config: None, + l2_chain_id: 10, + l1_chain_config: None, + }); + + let derivation_driver = Arc::new(MockDerivationDriver { + calls: Arc::new(Mutex::new(vec![])), + }); + + // Fail on the 2nd upsert (index 1) + let mock_preimage_repo = Arc::new(MockPreimageRepositoryPartialError { + upserted: Arc::new(Mutex::new(vec![])), + fail_index: 1, + }); + let mock_finalized_repo = Arc::new(MockFinalizedL1Repository { + upserted: Arc::new(Mutex::new(vec![])), + }); + + let collector = PreimageCollector { + client: l2_client, + beacon_client, + derivation_driver, + config: derivation_config, + preimage_repository: mock_preimage_repo.clone(), + finalized_l1_repository: mock_finalized_repo, + max_distance: 10, + max_concurrency: 2, + initial_claimed: 0, + interval_seconds: 1, + }; + + // Collect from 100. + // Batch will be [100->110, 110->120]. + // 1st upsert (110) succeeds. + // 2nd upsert (120) fails. + // Should return Some(110). + let result = collector.collect(100).await; + + assert_eq!(result, Some(110)); + + // Verify only 1 preimage saved + assert_eq!(mock_preimage_repo.upserted.lock().unwrap().len(), 1); + assert_eq!(mock_preimage_repo.upserted.lock().unwrap()[0].claimed, 110); + } +} diff --git a/server/src/data/file_finalized_l1_repository.rs b/server/src/data/file_finalized_l1_repository.rs new file mode 100644 index 0000000..22dd0df --- /dev/null +++ b/server/src/data/file_finalized_l1_repository.rs @@ -0,0 +1,196 @@ +use crate::data::finalized_l1_repository::FinalizedL1Repository; +use alloy_primitives::B256; +use axum::async_trait; +use std::time; +use std::time::Duration; +use tokio::fs; +use tokio::fs::DirEntry; + +#[derive(Clone)] +pub struct FileFinalizedL1Repository { + dir: String, + ttl: Duration, +} +impl FileFinalizedL1Repository { + pub fn new(parent_dir: &str, ttl: Duration) -> anyhow::Result { + let path = std::path::Path::new(parent_dir); + if !path.exists() { + return Err(anyhow::anyhow!("directory does not exist: {parent_dir}")); + } + if !path.is_dir() { + return Err(anyhow::anyhow!("path is not a directory: {parent_dir}")); + } + Ok(Self { + dir: parent_dir.to_string(), + ttl, + }) + } + + fn path(&self, l1_head_hash: &B256) -> String { + format!("{}/{}.json", self.dir, l1_head_hash) + } + + async fn entries(dir: &str) -> anyhow::Result> { + let mut file_list = vec![]; + let mut entries = fs::read_dir(dir) + .await + .map_err(|e| anyhow::anyhow!("failed to read dir: {dir:?}, error={e}"))?; + while let Some(entry) = entries.next_entry().await? { + match entry.file_name().to_str() { + None => continue, + Some(name) => { + if !name.ends_with(".tmp") { + file_list.push(entry); + } + } + } + } + Ok(file_list) + } +} + +#[async_trait] +impl FinalizedL1Repository for FileFinalizedL1Repository { + async fn upsert(&self, l1_head_hash: &B256, raw_finalized_l1: String) -> anyhow::Result<()> { + let path = self.path(l1_head_hash); + let tmp_path = format!("{path}.tmp"); + fs::write(&tmp_path, raw_finalized_l1).await?; + fs::rename(&tmp_path, &path).await?; + Ok(()) + } + + async fn get(&self, l1_head_hash: &B256) -> anyhow::Result { + let raw_finalized_l1 = fs::read_to_string(self.path(l1_head_hash)).await?; + Ok(raw_finalized_l1) + } + + async fn purge_expired(&self) -> anyhow::Result<()> { + let now = time::SystemTime::now(); + for entry in Self::entries(&self.dir).await? { + let metadata = entry.metadata().await?; + let modified = metadata.modified()?; + let expired = modified + .checked_add(self.ttl) + .ok_or_else(|| anyhow::anyhow!("expired finalized l1 cache is too new"))?; + if now >= expired { + fs::remove_file(entry.path()).await?; + } + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + #[test] + fn test_new_with_non_existent_dir() { + let res = + FileFinalizedL1Repository::new("/path/to/non/existent/dir", Duration::from_secs(1)); + assert!(res.is_err()); + assert_eq!( + res.err().unwrap().to_string(), + "directory does not exist: /path/to/non/existent/dir" + ); + } + + #[test] + fn test_new_with_existing_dir() { + let temp = std::env::temp_dir(); + let path = temp.to_str().unwrap(); + let res = FileFinalizedL1Repository::new(path, Duration::from_secs(1)); + assert!(res.is_ok()); + } + + fn unique_test_dir(suffix: &str) -> String { + let mut dir = std::env::temp_dir(); + let ts = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos(); + let pid = std::process::id(); + dir.push(format!( + "optimism_preimage_maker_test_l1_{pid}_{ts}_{suffix}" + )); + std::fs::create_dir_all(&dir).expect("create temp dir"); + dir.to_string_lossy().to_string() + } + + #[tokio::test] + async fn test_upsert_and_get() { + let dir = unique_test_dir("upsert"); + let repo = FileFinalizedL1Repository::new(&dir, Duration::from_secs(1)).expect("new repo"); + + let h = B256::from([1u8; 32]); + let data = "some json data".to_string(); + + repo.upsert(&h, data.clone()).await.expect("upsert"); + + let got = repo.get(&h).await.expect("get"); + assert_eq!(got, data); + + let missing = B256::from([2u8; 32]); + let res = repo.get(&missing).await; + assert!(res.is_err()); + + tokio::fs::remove_dir_all(dir).await.ok(); + } + + #[tokio::test] + async fn test_purge_expired() { + let dir = unique_test_dir("purge"); + // Short TTL + let repo = + FileFinalizedL1Repository::new(&dir, Duration::from_millis(100)).expect("new repo"); + + let h1 = B256::from([0xaau8; 32]); + let data = "old data".to_string(); + repo.upsert(&h1, data).await.expect("upsert h1"); + + // Wait for expiration + tokio::time::sleep(Duration::from_millis(200)).await; + + let h2 = B256::from([0xbbu8; 32]); + repo.upsert(&h2, "new data".to_string()) + .await + .expect("upsert h2"); + + // Purge should remove h1 but keep h2 (assuming touch updates mod time or we rely on creation time) + // Note: The implementation uses `metadata.created()`. File creation time is usually fixed. + // So h1 is created -> wait -> h2 created. h1 should be old enough. h2 is new. + repo.purge_expired().await.expect("purge"); + + assert!(repo.get(&h1).await.is_err(), "h1 should be purged"); + assert!(repo.get(&h2).await.is_ok(), "h2 should be kept"); + + tokio::fs::remove_dir_all(dir).await.ok(); + } + #[tokio::test] + async fn test_entries_ignores_tmp_files() { + let dir = unique_test_dir("tmp_ignore"); + let repo = FileFinalizedL1Repository::new(&dir, Duration::from_secs(1)).expect("new repo"); + + // Create a normal file via upsert + let h1 = B256::from([0xccu8; 32]); + repo.upsert(&h1, "data".to_string()).await.expect("upsert"); + + // Manually create a .tmp file + let path = repo.path(&h1); + let tmp_path = format!("{path}.tmp"); + tokio::fs::write(&tmp_path, "partial data") + .await + .expect("write tmp"); + + // Verify entries() ignores the .tmp file + let entries = FileFinalizedL1Repository::entries(&dir) + .await + .expect("entries"); + // Should only have the normal file, not the .tmp file + assert_eq!(entries.len(), 1); + assert!(!entries[0].file_name().to_string_lossy().ends_with(".tmp")); + + tokio::fs::remove_dir_all(dir).await.ok(); + } +} diff --git a/server/src/data/file_preimage_repository.rs b/server/src/data/file_preimage_repository.rs new file mode 100644 index 0000000..44ded80 --- /dev/null +++ b/server/src/data/file_preimage_repository.rs @@ -0,0 +1,394 @@ +use crate::data::preimage_repository::{PreimageMetadata, PreimageRepository}; +use axum::async_trait; +use std::collections::HashSet; +use std::sync::{Arc, RwLock}; +use std::time; +use tokio::fs; +use tokio::fs::DirEntry; +use tracing::{error, info}; + +#[derive(Clone)] +pub struct FilePreimageRepository { + dir: String, + metadata_list: Arc>>, + ttl: time::Duration, +} + +impl FilePreimageRepository { + pub async fn new(parent_dir: &str, ttl: time::Duration) -> anyhow::Result { + let path = std::path::Path::new(parent_dir); + if !path.exists() { + return Err(anyhow::anyhow!("directory does not exist: {parent_dir}")); + } + if !path.is_dir() { + return Err(anyhow::anyhow!("path is not a directory: {parent_dir}")); + } + let metadata_list = Self::load_metadata(parent_dir).await?; + info!("loaded metadata: {:?}", metadata_list.len()); + Ok(Self { + dir: parent_dir.to_string(), + metadata_list: Arc::new(RwLock::new(metadata_list)), + ttl, + }) + } + + async fn load_metadata(dir: &str) -> anyhow::Result> { + let mut metadata_list = HashSet::new(); + + let entries = Self::entries(dir).await?; + for entry in entries { + let name_osstr = entry.file_name(); + let name = match name_osstr.to_str() { + Some(s) => s.to_string(), + None => { + error!( + "skipping file with invalid UTF-8 filename: {:?}", + name_osstr + ); + continue; + } + }; + let metadata = PreimageMetadata::try_from(name.as_str()); + match metadata { + Err(e) => { + error!("failed to parse metadata: {:?}, error={}", name, e); + } + Ok(metadata) => { + metadata_list.insert(metadata); + } + } + } + Ok(metadata_list) + } + + async fn entries(dir: &str) -> anyhow::Result> { + let mut file_list = vec![]; + let mut entries = fs::read_dir(dir) + .await + .map_err(|e| anyhow::anyhow!("failed to read dir: {dir:?}, error={e}"))?; + while let Some(entry) = entries.next_entry().await? { + match entry.file_name().to_str() { + None => continue, + Some(name) => { + if !name.ends_with(".tmp") { + file_list.push(entry); + } + } + } + } + Ok(file_list) + } + + fn path(&self, metadata: &PreimageMetadata) -> String { + format!( + "{}/{}_{}_{}", + self.dir, metadata.agreed, metadata.claimed, metadata.l1_head + ) + } + fn sort(metadata_list: &mut [PreimageMetadata]) { + metadata_list.sort_by(|a, b| a.claimed.cmp(&b.claimed)); + } +} + +#[async_trait] +impl PreimageRepository for FilePreimageRepository { + async fn upsert(&self, metadata: PreimageMetadata, preimage: Vec) -> anyhow::Result<()> { + let path = self.path(&metadata); + let tmp_path = format!("{path}.tmp"); + fs::write(&tmp_path, preimage).await?; + fs::rename(&tmp_path, &path).await?; + + // NOTE: Process should be restarted when locking is failed to avoid dirty metadata. + let mut lock = self.metadata_list.write().unwrap(); + lock.insert(metadata); + Ok(()) + } + async fn get(&self, metadata: &PreimageMetadata) -> anyhow::Result> { + let path = self.path(metadata); + let preimage = fs::read(&path).await?; + Ok(preimage) + } + async fn list_metadata( + &self, + lt_claimed: Option, + gt_claimed: Option, + ) -> Vec { + let mut raw = { + let lock = self.metadata_list.read().unwrap(); + lock.iter().cloned().collect::>() + }; + Self::sort(&mut raw); + let raw = match gt_claimed { + None => raw, + Some(gt_claimed) => raw.into_iter().filter(|m| m.claimed > gt_claimed).collect(), + }; + match lt_claimed { + None => raw, + Some(lt_claimed) => raw.into_iter().filter(|m| m.claimed < lt_claimed).collect(), + } + } + + async fn latest_metadata(&self) -> Option { + let mut result = self.list_metadata(None, None).await; + result.pop() + } + + async fn purge_expired(&self) -> anyhow::Result<()> { + let now = time::SystemTime::now(); + let mut target = vec![]; + for entry in Self::entries(&self.dir).await? { + let metadata = entry.metadata().await?; + let modified = metadata.modified()?; + let expired = modified.checked_add(self.ttl).ok_or_else(|| { + anyhow::anyhow!("TTL duration overflow when calculating expiration time") + })?; + if now >= expired { + target.push(entry); + } + } + info!("purging expired preimage metadata: {:?}", target.len()); + + // delete files from disk + let mut target_cached = vec![]; + for entry in target { + let name_osstr = entry.file_name(); + let name = match name_osstr.to_str() { + Some(s) => s.to_string(), + None => { + error!("Skipping file with non-UTF-8 filename: {:?}", name_osstr); + continue; + } + }; + fs::remove_file(entry.path()).await?; + if let Ok(v) = PreimageMetadata::try_from(name.as_str()) { + target_cached.push(v); + } + } + + // Remove from cache + { + let mut lock = self.metadata_list.write().unwrap(); + for v in target_cached { + lock.remove(&v); + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::FilePreimageRepository; + use crate::data::preimage_repository::{PreimageMetadata, PreimageRepository}; + use alloy_primitives::B256; + use std::path::PathBuf; + use std::time::Duration; + + fn unique_test_dir(suffix: &str) -> String { + let mut dir = std::env::temp_dir(); + let ts = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos(); + let pid = std::process::id(); + dir.push(format!("optimism_preimage_maker_test_{pid}_{ts}_{suffix}")); + std::fs::create_dir_all(&dir).expect("create temp dir"); + dir.to_string_lossy().to_string() + } + + fn make_meta(agreed: u64, claimed: u64, head: B256) -> PreimageMetadata { + PreimageMetadata { + agreed, + claimed, + l1_head: head, + } + } + + #[tokio::test] + async fn test_new_empty_dir() { + let dir = unique_test_dir("empty"); + let repo = FilePreimageRepository::new(&dir, Duration::from_secs(1)) + .await + .expect("repo new"); + let list = repo.list_metadata(None, None).await; + assert!(list.is_empty()); + let latest = repo.latest_metadata().await; + assert!(latest.is_none()); + tokio::fs::remove_dir_all(dir).await.ok(); + } + + #[tokio::test] + async fn test_new_with_non_existent_dir() { + let res = + FilePreimageRepository::new("/path/to/non/existent/dir", Duration::from_secs(1)).await; + assert!(res.is_err()); + assert_eq!( + res.err().unwrap().to_string(), + "directory does not exist: /path/to/non/existent/dir" + ); + } + + #[tokio::test] + async fn test_upsert_and_get_and_latest() { + let dir = unique_test_dir("upsert"); + let repo = FilePreimageRepository::new(&dir, Duration::from_secs(1)) + .await + .expect("repo new"); + + let h = B256::from([1u8; 32]); + let m1 = make_meta(1, 2, h); + let m2 = make_meta(2, 3, h); + + let p1 = b"hello".to_vec(); + let p2 = b"world".to_vec(); + + repo.upsert(m1.clone(), p1.clone()) + .await + .expect("upsert m1"); + repo.upsert(m2.clone(), p2.clone()) + .await + .expect("upsert m2"); + + let got1 = repo.get(&m1).await.expect("get m1"); + assert_eq!(got1, p1); + let got2 = repo.get(&m2).await.expect("get m2"); + assert_eq!(got2, p2); + + // list should be sorted by agreed ascending + let list = repo.list_metadata(None, None).await; + assert_eq!(list, vec![m1.clone(), m2.clone()]); + + // latest should be the last after sorting (m2) + let latest = repo.latest_metadata().await; + assert_eq!(latest, Some(m2.clone())); + + tokio::fs::remove_dir_all(dir).await.ok(); + } + + #[tokio::test] + async fn test_list_metadata_filter_and_sort() { + let dir = unique_test_dir("filter"); + let repo = FilePreimageRepository::new(&dir, Duration::from_secs(1)) + .await + .expect("repo new"); + let h = B256::from([2u8; 32]); + let m_a = make_meta(5, 6, h); + let m_b = make_meta(1, 2, h); + let m_c = make_meta(3, 4, h); + + repo.upsert(m_a.clone(), vec![10]).await.unwrap(); + repo.upsert(m_b.clone(), vec![20]).await.unwrap(); + repo.upsert(m_c.clone(), vec![30]).await.unwrap(); + + let all = repo.list_metadata(None, None).await; + assert_eq!(all, vec![m_b.clone(), m_c.clone(), m_a.clone()]); + + // filter by claimed > x + let filtered = repo.list_metadata(None, Some(2)).await; // claimed > 2 => m_c(4), m_a(6) + assert_eq!(filtered, vec![m_c.clone(), m_a.clone()]); + + // filter by claimed < x + let filtered = repo.list_metadata(Some(4), None).await; // claimed < 2 => m_c(2) + assert_eq!(filtered, vec![m_b.clone()]); + + // filter by gt < claimed < lt + let filtered = repo.list_metadata(Some(7), Some(4)).await; // 4 < claimed < 7 => m_c(6) + assert_eq!(filtered, vec![m_a.clone()]); + + tokio::fs::remove_dir_all(dir).await.ok(); + } + + #[tokio::test] + async fn test_load_existing_files() { + let dir = unique_test_dir("load"); + + // Pre-create files with valid names and contents + let h1 = B256::from([3u8; 32]); + let h2 = B256::from([4u8; 32]); + let m1 = make_meta(10, 11, h1); + let m2 = make_meta(12, 13, h2); + + let f1 = format!("{}_{}_{}", m1.agreed, m1.claimed, m1.l1_head); + let f2 = format!("{}_{}_{}", m2.agreed, m2.claimed, m2.l1_head); + let mut p1 = PathBuf::from(&dir); + p1.push(&f1); + let mut p2 = PathBuf::from(&dir); + p2.push(&f2); + + tokio::fs::write(&p1, b"alpha").await.unwrap(); + tokio::fs::write(&p2, b"beta").await.unwrap(); + // invalid file should be ignored + let mut junk = PathBuf::from(&dir); + junk.push("invalid_name.txt"); + tokio::fs::write(&junk, b"junk").await.unwrap(); + + let repo = FilePreimageRepository::new(&dir, Duration::from_secs(1)) + .await + .expect("repo new"); + + let list = repo.list_metadata(None, None).await; + // Should be sorted by agreed + assert_eq!(list, vec![m1.clone(), m2.clone()]); + + let g1 = repo.get(&m1).await.unwrap(); + assert_eq!(g1, b"alpha"); + let g2 = repo.get(&m2).await.unwrap(); + assert_eq!(g2, b"beta"); + + tokio::fs::remove_dir_all(dir).await.ok(); + } + + #[tokio::test] + async fn test_overwrite_same_metadata_replaces_file() { + let dir = unique_test_dir("overwrite"); + let repo = FilePreimageRepository::new(&dir, Duration::from_secs(1)) + .await + .expect("repo new"); + let h = B256::from([5u8; 32]); + let m = make_meta(7, 8, h); + + repo.upsert(m.clone(), b"v1".to_vec()).await.unwrap(); + let got = repo.get(&m).await.unwrap(); + assert_eq!(got, b"v1".to_vec()); + + // overwrite + repo.upsert(m.clone(), b"v2".to_vec()).await.unwrap(); + let got2 = repo.get(&m).await.unwrap(); + assert_eq!(got2, b"v2".to_vec()); + + tokio::fs::remove_dir_all(dir).await.ok(); + } + #[tokio::test] + async fn test_entries_ignores_tmp_files() { + let dir = unique_test_dir("tmp_ignore_preimage"); + let repo = FilePreimageRepository::new(&dir, Duration::from_secs(1)) + .await + .expect("new repo"); + + // Create a normal file via upsert + let h = B256::from([0xddu8; 32]); + let m = make_meta(1, 1, h); + repo.upsert(m.clone(), b"data".to_vec()) + .await + .expect("upsert"); + + // Manually create a .tmp file + let path = repo.path(&m); + let tmp_path = format!("{path}.tmp"); + tokio::fs::write(&tmp_path, b"partial data") + .await + .expect("write tmp"); + + // Verify entries() ignores the .tmp file by reloading metadata + let repo2 = FilePreimageRepository::new(&dir, Duration::from_secs(1)) + .await + .expect("new repo 2"); + let list = repo2.list_metadata(None, None).await; + // Should contain m, but no error or extra entry for tmp + assert_eq!(list.len(), 1); + assert_eq!(list[0], m); + + tokio::fs::remove_dir_all(dir).await.ok(); + } +} diff --git a/server/src/data/finalized_l1_repository.rs b/server/src/data/finalized_l1_repository.rs new file mode 100644 index 0000000..a14f7e2 --- /dev/null +++ b/server/src/data/finalized_l1_repository.rs @@ -0,0 +1,11 @@ +use alloy_primitives::B256; +use axum::async_trait; + +#[async_trait] +pub trait FinalizedL1Repository: Send + Sync { + async fn upsert(&self, l1_head_hash: &B256, raw_finalized_l1: String) -> anyhow::Result<()>; + + async fn get(&self, l1_head_hash: &B256) -> anyhow::Result; + + async fn purge_expired(&self) -> anyhow::Result<()>; +} diff --git a/server/src/data/mod.rs b/server/src/data/mod.rs new file mode 100644 index 0000000..bcb17b4 --- /dev/null +++ b/server/src/data/mod.rs @@ -0,0 +1,4 @@ +pub mod file_finalized_l1_repository; +pub mod file_preimage_repository; +pub mod finalized_l1_repository; +pub mod preimage_repository; diff --git a/server/src/data/preimage_repository.rs b/server/src/data/preimage_repository.rs new file mode 100644 index 0000000..30a3d56 --- /dev/null +++ b/server/src/data/preimage_repository.rs @@ -0,0 +1,105 @@ +use alloy_primitives::hex::FromHex; +use alloy_primitives::B256; +use axum::async_trait; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub struct PreimageMetadata { + pub agreed: u64, + pub claimed: u64, + pub l1_head: B256, +} + +impl TryFrom<&str> for PreimageMetadata { + type Error = anyhow::Error; + + fn try_from(name: &str) -> Result { + let split = name.split("_").collect::>(); + if split.len() != 3 { + return Err(anyhow::anyhow!("invalid preimage name: {name}")); + } + let agreed_l2: u64 = split[0].parse()?; + let claimed_l2: u64 = split[1].parse()?; + let l1_head_hash = B256::from_hex(split[2])?; + Ok(PreimageMetadata { + agreed: agreed_l2, + claimed: claimed_l2, + l1_head: l1_head_hash, + }) + } +} + +#[async_trait] +pub trait PreimageRepository: Send + Sync { + async fn upsert(&self, metadata: PreimageMetadata, preimage: Vec) -> anyhow::Result<()>; + + async fn get(&self, metadata: &PreimageMetadata) -> anyhow::Result>; + + async fn list_metadata( + &self, + lt_claimed: Option, + gt_claimed: Option, + ) -> Vec; + + async fn latest_metadata(&self) -> Option; + + async fn purge_expired(&self) -> anyhow::Result<()>; +} + +#[cfg(test)] +mod tests { + use super::PreimageMetadata; + use alloy_primitives::B256; + // for hex::encode + use serde_json; + + #[test] + fn test_parse_invalid_component_count() { + let bad = "1_2"; // only two parts + let err = PreimageMetadata::try_from(bad).unwrap_err(); + let msg = format!("{err}"); + assert!(msg.contains("invalid preimage name")); + } + + #[test] + fn test_parse_invalid_numbers() { + // non-numeric agreed + let h = B256::from([0x33u8; 32]); + let s1 = format!("x_2_{h}"); + assert!(PreimageMetadata::try_from(s1.as_str()).is_err()); + + // non-numeric claimed + let s2 = format!("1_y_{h}"); + assert!(PreimageMetadata::try_from(s2.as_str()).is_err()); + } + + #[test] + fn test_parse_invalid_hash() { + let bad = "1_2_nothex"; + assert!(PreimageMetadata::try_from(bad).is_err()); + } + + #[test] + fn test_serde_roundtrip() { + let m = PreimageMetadata { + agreed: 123, + claimed: 456, + l1_head: B256::from([0x44u8; 32]), + }; + let json = serde_json::to_string(&m).expect("serialize"); + let back: PreimageMetadata = serde_json::from_str(&json).expect("deserialize"); + assert_eq!(back, m); + } + + #[test] + fn test_filename_roundtrip() { + let m = PreimageMetadata { + agreed: 7, + claimed: 8, + l1_head: B256::from([0x55u8; 32]), + }; + let name = format!("{}_{}_{}", m.agreed, m.claimed, m.l1_head); + let parsed = PreimageMetadata::try_from(name.as_str()).expect("parse ok"); + assert_eq!(parsed, m); + } +} diff --git a/server/src/host/mod.rs b/server/src/derivation/host/mod.rs similarity index 100% rename from server/src/host/mod.rs rename to server/src/derivation/host/mod.rs diff --git a/server/src/host/single/config.rs b/server/src/derivation/host/single/config.rs similarity index 56% rename from server/src/host/single/config.rs rename to server/src/derivation/host/single/config.rs index 5abbacc..96660ef 100644 --- a/server/src/host/single/config.rs +++ b/server/src/derivation/host/single/config.rs @@ -34,4 +34,39 @@ pub struct Config { /// Optional L1 chain config base64 json string. (this is only required for devnet) #[clap(long)] pub l1_chain_config: Option, + + /// preimage directory if specified. (ex. .preimage) + #[clap(long, default_value = ".preimage")] + pub preimage_dir: String, + + /// finalized l1 directory if specified. (ex. .finalized_l1) + #[clap(long, default_value = ".finalized_l1")] + pub finalized_l1_dir: String, + + /// Max preimage distance ( from agreed to claimed) per one call + #[clap(long, default_value = "100")] + pub max_preimage_distance: u64, + + /// Max concurrency of preimage collector. + #[clap(long, default_value = "10")] + pub max_collect_concurrency: u64, + + /// Initial claimed l2 block number that is used when no preimage is created. + #[clap(long)] + pub initial_claimed_l2: u64, + + /// Interval seconds between preimage collection. + #[clap(long, default_value = "60")] + pub collector_interval_seconds: u64, + + #[clap(long, default_value = "86400")] + pub purger_interval_seconds: u64, + + /// TTL in seconds. default 7 days + #[clap(long, default_value = "604800")] + pub ttl: u64, + + /// HTTP client timeout in seconds. + #[clap(long, default_value = "30")] + pub http_client_timeout_seconds: u64, } diff --git a/server/src/host/single/handler.rs b/server/src/derivation/host/single/handler.rs similarity index 77% rename from server/src/host/single/handler.rs rename to server/src/derivation/host/single/handler.rs index 7647c59..707d16f 100644 --- a/server/src/host/single/handler.rs +++ b/server/src/derivation/host/single/handler.rs @@ -1,8 +1,8 @@ //! [SingleChainHostCli]'s [HostOrchestrator] + [DetachedHostOrchestrator] implementations. -use crate::host::single::config::Config; -use crate::host::single::local_kv::LocalKeyValueStore; -use crate::host::single::trace::{encode_to_bytes, TracingKeyValueStore}; +use crate::derivation::host::single::config::Config; +use crate::derivation::host::single::local_kv::LocalKeyValueStore; +use crate::derivation::host::single::trace::{encode_to_bytes, TracingKeyValueStore}; use alloy_primitives::B256; use anyhow::Result; use kona_genesis::{L1ChainConfig, RollupConfig}; @@ -14,25 +14,35 @@ use kona_preimage::{ }; use kona_proof::boot::{L1_CONFIG_KEY, L2_ROLLUP_CONFIG_KEY}; use kona_proof::HintType; +use serde::{Deserialize, Serialize}; use std::sync::Arc; use tokio::sync::RwLock; #[derive(Debug, Clone)] -pub struct DerivationRequest { +pub struct DerivationConfig { pub config: Config, pub rollup_config: Option, pub l2_chain_id: u64, - /// for L2 derivation + /// L1 chain config, only required in devnet + pub l1_chain_config: Option, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct DerivationRequest { + pub l1_head_hash: B256, pub agreed_l2_head_hash: B256, pub agreed_l2_output_root: B256, - pub l1_head_hash: B256, pub l2_output_root: B256, pub l2_block_number: u64, - /// L1 chain config, only required in devnet - pub l1_chain_config: Option, } -impl DerivationRequest { +#[derive(Debug, Clone)] +pub struct Derivation { + pub config: Arc, + pub request: DerivationRequest, +} + +impl Derivation { fn create_key_value_store(&self) -> Result>> { // Only memory store is traceable // Using disk causes insufficient blob preimages in ELC because the already stored data is not traceable @@ -78,14 +88,14 @@ impl DerivationRequest { let preimage = BidirectionalChannel::new()?; let kv_store = self.create_key_value_store()?; let cfg = SingleChainHost { - l1_node_address: Some(self.config.l1_node_address.clone()), - l2_node_address: Some(self.config.l2_node_address.clone()), - l1_beacon_address: Some(self.config.l1_beacon_address.clone()), - l1_head: self.l1_head_hash, - agreed_l2_output_root: self.agreed_l2_output_root, - agreed_l2_head_hash: self.agreed_l2_head_hash, - claimed_l2_output_root: self.l2_output_root, - claimed_l2_block_number: self.l2_block_number, + l1_node_address: Some(self.config.config.l1_node_address.clone()), + l2_node_address: Some(self.config.config.l2_node_address.clone()), + l1_beacon_address: Some(self.config.config.l1_beacon_address.clone()), + l1_head: self.request.l1_head_hash, + agreed_l2_output_root: self.request.agreed_l2_output_root, + agreed_l2_head_hash: self.request.agreed_l2_head_hash, + claimed_l2_output_root: self.request.l2_output_root, + claimed_l2_block_number: self.request.l2_block_number, ..Default::default() }; let providers = cfg.create_providers().await?; @@ -104,12 +114,12 @@ impl DerivationRequest { }; // In devnet, we need to provide L1 chain config and l2 rollup config - if let Some(rollup_config) = &self.rollup_config { + if let Some(rollup_config) = &self.config.rollup_config { let local_key = PreimageKey::new_local(L2_ROLLUP_CONFIG_KEY.to()); let roll_up_config_json = serde_json::to_vec(rollup_config)?; used.insert(local_key, roll_up_config_json); } - if let Some(l1_chain_config) = &self.l1_chain_config { + if let Some(l1_chain_config) = &self.config.l1_chain_config { let local_key = PreimageKey::new_local(L1_CONFIG_KEY.to()); let l1_chain_config_json = serde_json::to_vec(l1_chain_config)?; used.insert(local_key, l1_chain_config_json); diff --git a/server/src/host/single/local_kv.rs b/server/src/derivation/host/single/local_kv.rs similarity index 68% rename from server/src/host/single/local_kv.rs rename to server/src/derivation/host/single/local_kv.rs index e725a53..b3623ea 100644 --- a/server/src/host/single/local_kv.rs +++ b/server/src/derivation/host/single/local_kv.rs @@ -1,7 +1,7 @@ //! Contains a concrete implementation of the [KeyValueStore] trait that stores data on disk, //! using the [SingleChainHostCli] config. -use crate::host::single::handler::DerivationRequest; +use crate::derivation::host::single::handler::Derivation; use alloy_primitives::B256; use anyhow::Result; use kona_host::KeyValueStore; @@ -14,12 +14,12 @@ use kona_proof::boot::{ /// A simple, synchronous key-value store that returns data from a [SingleChainHostCli] config. #[derive(Debug)] pub struct LocalKeyValueStore { - cfg: DerivationRequest, + cfg: Derivation, } impl LocalKeyValueStore { /// Create a new [LocalKeyValueStore] with the given [SingleChainHostCli] config. - pub const fn new(cfg: DerivationRequest) -> Self { + pub const fn new(cfg: Derivation) -> Self { Self { cfg } } } @@ -28,19 +28,21 @@ impl KeyValueStore for LocalKeyValueStore { fn get(&self, key: B256) -> Option> { let preimage_key = PreimageKey::try_from(*key).ok()?; match preimage_key.key_value() { - L1_HEAD_KEY => Some(self.cfg.l1_head_hash.to_vec()), - L2_OUTPUT_ROOT_KEY => Some(self.cfg.agreed_l2_output_root.to_vec()), - L2_CLAIM_KEY => Some(self.cfg.l2_output_root.to_vec()), - L2_CLAIM_BLOCK_NUMBER_KEY => Some(self.cfg.l2_block_number.to_be_bytes().to_vec()), - L2_CHAIN_ID_KEY => Some(self.cfg.l2_chain_id.to_be_bytes().to_vec()), - L2_ROLLUP_CONFIG_KEY => match &self.cfg.rollup_config { + L1_HEAD_KEY => Some(self.cfg.request.l1_head_hash.to_vec()), + L2_OUTPUT_ROOT_KEY => Some(self.cfg.request.agreed_l2_output_root.to_vec()), + L2_CLAIM_KEY => Some(self.cfg.request.l2_output_root.to_vec()), + L2_CLAIM_BLOCK_NUMBER_KEY => { + Some(self.cfg.request.l2_block_number.to_be_bytes().to_vec()) + } + L2_CHAIN_ID_KEY => Some(self.cfg.config.l2_chain_id.to_be_bytes().to_vec()), + L2_ROLLUP_CONFIG_KEY => match &self.cfg.config.rollup_config { None => { tracing::error!("Rollup config is not provided in derivation request"); None } Some(rollup_config) => serde_json::to_vec(rollup_config).ok(), }, - L1_CONFIG_KEY => match &self.cfg.l1_chain_config { + L1_CONFIG_KEY => match &self.cfg.config.l1_chain_config { None => { tracing::error!("L1 chain config is not provided in derivation request"); None diff --git a/server/src/host/single/mod.rs b/server/src/derivation/host/single/mod.rs similarity index 100% rename from server/src/host/single/mod.rs rename to server/src/derivation/host/single/mod.rs diff --git a/server/src/host/single/trace.rs b/server/src/derivation/host/single/trace.rs similarity index 100% rename from server/src/host/single/trace.rs rename to server/src/derivation/host/single/trace.rs diff --git a/server/src/derivation/mod.rs b/server/src/derivation/mod.rs new file mode 100644 index 0000000..5361e40 --- /dev/null +++ b/server/src/derivation/mod.rs @@ -0,0 +1 @@ +pub mod host; diff --git a/server/src/lib.rs b/server/src/lib.rs index 36f4344..2c0af97 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1,6 +1,6 @@ #![allow(dead_code)] -mod host; -mod server; -pub use server::Request; +pub mod client; +pub mod derivation; +pub mod web; -pub mod l2_client; +pub mod data; diff --git a/server/src/main.rs b/server/src/main.rs index 77f2887..08bd5d1 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,18 +1,33 @@ -use crate::host::single::config::Config; -use crate::server::{start_http_server_task, DerivationState}; +#![allow(dead_code)] +use crate::client::beacon_client::HttpBeaconClient; +use crate::client::l2_client::HttpL2Client; +use crate::client::l2_client::L2Client; +use crate::collector::{PreimageCollector, RealDerivationDriver}; +use crate::data::file_finalized_l1_repository::FileFinalizedL1Repository; +use crate::data::file_preimage_repository::FilePreimageRepository; +use crate::derivation::host::single::config::Config; +use crate::derivation::host::single::handler::DerivationConfig; +use crate::purger::PreimagePurger; +use crate::web::start_http_server_task; +use crate::web::SharedState; use anyhow::Context; use base64::Engine; use clap::Parser; use kona_registry::ROLLUP_CONFIGS; -use l2_client::L2Client; +use std::sync::Arc; +use std::time::Duration; +use tokio::select; use tracing::info; use tracing_subscriber::filter; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; -mod host; -pub mod l2_client; -mod server; +mod client; +mod collector; +mod data; +mod derivation; +mod purger; +mod web; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -25,12 +40,16 @@ async fn main() -> anyhow::Result<()> { .with(tracing_subscriber::fmt::layer()) .with(filter) .init(); - info!("start optimism preimage-maker"); + info!("start optimism-preimage-maker"); - let l2_client = L2Client::new( + let http_client_timeout = Duration::from_secs(config.http_client_timeout_seconds); + let l2_client = HttpL2Client::new( config.l2_rollup_address.to_string(), config.l2_node_address.to_string(), + http_client_timeout, ); + let beacon_client = + HttpBeaconClient::new(config.l1_beacon_address.to_string(), http_client_timeout); let chain_id = l2_client.chain_id().await?; let (rollup_config, l1_chain_config) = if ROLLUP_CONFIGS.get(&chain_id).is_none() { // devnet only @@ -50,19 +69,68 @@ async fn main() -> anyhow::Result<()> { (None, None) }; + let derivation_config = DerivationConfig { + rollup_config, + l1_chain_config, + config: config.clone(), + l2_chain_id: chain_id, + }; + + // Start preimage collector + info!("purging ttl = {} sec", config.ttl); + let ttl = Duration::from_secs(config.ttl); + let preimage_repository = + Arc::new(FilePreimageRepository::new(&config.preimage_dir, ttl).await?); + let finalized_l1_repository = Arc::new(FileFinalizedL1Repository::new( + &config.finalized_l1_dir, + ttl, + )?); + let collector = PreimageCollector { + client: Arc::new(l2_client), + beacon_client: Arc::new(beacon_client), + derivation_driver: Arc::new(RealDerivationDriver), + config: Arc::new(derivation_config), + max_distance: config.max_preimage_distance, + initial_claimed: config.initial_claimed_l2, + interval_seconds: config.collector_interval_seconds, + preimage_repository: preimage_repository.clone(), + finalized_l1_repository: finalized_l1_repository.clone(), + max_concurrency: config.max_collect_concurrency as usize, + }; + let collector_task = tokio::spawn(async move { + collector.start().await; + }); + + let purger = PreimagePurger { + preimage_repository: preimage_repository.clone(), + finalized_l1_repository: finalized_l1_repository.clone(), + interval_seconds: config.purger_interval_seconds, + }; + let purger_task = tokio::spawn(async move { + purger.start().await; + }); + // Start HTTP server let http_server_task = start_http_server_task( config.http_server_addr.as_str(), - DerivationState { - rollup_config, - l1_chain_config, - config: config.clone(), - l2_chain_id: chain_id, + SharedState { + preimage_repository, + finalized_l1_repository, }, ); - let result = http_server_task.await; - info!("server result : {:?}", result); - + // Wait for signal + select! { + result = http_server_task => { + info!("stop http server: {:?}", result); + } + result = collector_task => { + info!("stop collector : {:?}", result); + } + result = purger_task => { + info!("stop purger : {:?}", result); + } + } + info!("shutdown complete"); Ok(()) } diff --git a/server/src/purger.rs b/server/src/purger.rs new file mode 100644 index 0000000..69cea35 --- /dev/null +++ b/server/src/purger.rs @@ -0,0 +1,116 @@ +use crate::data::finalized_l1_repository::FinalizedL1Repository; +use crate::data::preimage_repository::PreimageRepository; +use std::sync::Arc; +use std::time::Duration; +use tokio::time; + +pub struct PreimagePurger { + pub preimage_repository: Arc, + pub finalized_l1_repository: Arc, + pub interval_seconds: u64, +} + +impl PreimagePurger { + pub async fn start(&self) { + loop { + self.run_once().await; + time::sleep(Duration::from_secs(self.interval_seconds)).await; + } + } + + pub async fn run_once(&self) { + tracing::info!("start: purge expired preimages"); + if let Err(e) = self.preimage_repository.purge_expired().await { + tracing::error!("failed to purge expired preimages: {:?}", e); + } + if let Err(e) = self.finalized_l1_repository.purge_expired().await { + tracing::error!("failed to purge expired finalized l1 heads: {:?}", e); + } + tracing::info!("end: purge expired preimages"); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::data::preimage_repository::PreimageMetadata; + use alloy_primitives::B256; + use axum::async_trait; + use std::sync::{Arc, Mutex}; + + struct MockPreimageRepository { + pub purged: Arc>, + } + #[async_trait] + impl PreimageRepository for MockPreimageRepository { + async fn upsert( + &self, + _metadata: PreimageMetadata, + _preimage: Vec, + ) -> anyhow::Result<()> { + Ok(()) + } + async fn get(&self, _metadata: &PreimageMetadata) -> anyhow::Result> { + Ok(vec![]) + } + async fn list_metadata( + &self, + _lt_claimed: Option, + _gt_claimed: Option, + ) -> Vec { + vec![] + } + async fn latest_metadata(&self) -> Option { + None + } + async fn purge_expired(&self) -> anyhow::Result<()> { + *self.purged.lock().unwrap() = true; + Ok(()) + } + } + + struct MockFinalizedL1Repository { + pub purged: Arc>, + } + #[async_trait] + impl FinalizedL1Repository for MockFinalizedL1Repository { + async fn upsert( + &self, + _l1_head_hash: &B256, + _raw_finalized_l1: String, + ) -> anyhow::Result<()> { + Ok(()) + } + async fn get(&self, _l1_head_hash: &B256) -> anyhow::Result { + Ok("".to_string()) + } + async fn purge_expired(&self) -> anyhow::Result<()> { + *self.purged.lock().unwrap() = true; + Ok(()) + } + } + + #[tokio::test] + async fn test_purger_run_once() { + let p_purged = Arc::new(Mutex::new(false)); + let l1_purged = Arc::new(Mutex::new(false)); + + let p_repo = MockPreimageRepository { + purged: p_purged.clone(), + }; + let l1_repo = MockFinalizedL1Repository { + purged: l1_purged.clone(), + }; + + let purger = PreimagePurger { + preimage_repository: Arc::new(p_repo), + finalized_l1_repository: Arc::new(l1_repo), + interval_seconds: 1, + }; + + purger.run_once().await; + + assert!(*p_purged.lock().unwrap()); + assert!(*l1_purged.lock().unwrap()); + } +} diff --git a/server/src/server.rs b/server/src/server.rs deleted file mode 100644 index 4591546..0000000 --- a/server/src/server.rs +++ /dev/null @@ -1,110 +0,0 @@ -use crate::host::single::config::Config; -use crate::host::single::handler::DerivationRequest; -use alloy_primitives::B256; -use anyhow::{Context, Result}; -use axum::extract::State; -use axum::http::StatusCode; -use axum::routing::post; -use axum::Json; -use kona_genesis::{L1ChainConfig, RollupConfig}; -use serde::{Deserialize, Serialize}; -use std::fmt::Debug; -use std::sync::Arc; -use tokio::net::TcpListener; -use tokio::task::JoinHandle; -use tracing::{error, info}; - -pub struct DerivationState { - pub rollup_config: Option, - pub l1_chain_config: Option, - pub config: Config, - pub l2_chain_id: u64, -} - -async fn start_http_server(addr: &str, derivation_state: DerivationState) -> Result<()> { - let app = axum::Router::new() - .route("/derivation", post(derivation)) - .with_state(Arc::new(derivation_state)); - - let listener = TcpListener::bind(addr).await?; - tracing::info!("listening on {}", addr); - axum::serve(listener, app).await?; - Ok(()) -} - -pub fn start_http_server_task(addr: &str, state: DerivationState) -> JoinHandle> { - let addr = addr.to_string(); - tokio::spawn(async move { - start_http_server(&addr, state) - .await - .context("http server error") - }) -} - -// handler - -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct Request { - pub l1_head_hash: B256, - pub agreed_l2_head_hash: B256, - pub agreed_l2_output_root: B256, - pub l2_output_root: B256, - pub l2_block_number: u64, -} - -async fn derivation( - State(state): State>, - Json(payload): Json, -) -> (StatusCode, Vec) { - info!("derivation request: {:?}", payload); - if let Err(v) = validate_request(&payload) { - return (StatusCode::BAD_REQUEST, v.as_bytes().to_vec()); - } - - let derivation = DerivationRequest { - config: state.config.clone(), - rollup_config: state.rollup_config.clone(), - l1_chain_config: state.l1_chain_config.clone(), - l2_chain_id: state.l2_chain_id, - agreed_l2_head_hash: payload.agreed_l2_head_hash, - agreed_l2_output_root: payload.agreed_l2_output_root, - l1_head_hash: payload.l1_head_hash, - l2_output_root: payload.l2_output_root, - l2_block_number: payload.l2_block_number, - }; - - match derivation.start().await { - Ok(preimage) => { - info!("derivation success"); - (StatusCode::OK, preimage) - } - Err(e) => { - info!("failed to run derivation: {:?}", e); - (StatusCode::INTERNAL_SERVER_ERROR, vec![]) - } - } -} - -fn validate_request(payload: &Request) -> Result<(), &'static str> { - if payload.agreed_l2_output_root == payload.l2_output_root { - error!("agreed_l2_output_root and l2_output_root are same value"); - return Err("agreed_l2_output_root and l2_output_root are the same value"); - } - if payload.agreed_l2_output_root.is_empty() || payload.agreed_l2_output_root.is_zero() { - error!("invalid agreed_l2_output_root",); - return Err("invalid agreed_l2_output_root"); - } - if payload.l2_output_root.is_empty() || payload.l2_output_root.is_zero() { - error!("invalid l2_output_root",); - return Err("invalid l2_output_root"); - } - if payload.l1_head_hash.is_empty() || payload.l1_head_hash.is_zero() { - error!("invalid l1_head_hash",); - return Err("invalid l1_head_hash"); - } - if payload.l2_block_number == 0 { - error!("invalid l2_block_number",); - return Err("invalid l2_block_number"); - } - Ok(()) -} diff --git a/server/src/web.rs b/server/src/web.rs new file mode 100644 index 0000000..39434be --- /dev/null +++ b/server/src/web.rs @@ -0,0 +1,424 @@ +use crate::data::finalized_l1_repository::FinalizedL1Repository; +use crate::data::preimage_repository::{PreimageMetadata, PreimageRepository}; +use alloy_primitives::B256; +use anyhow::{Context, Result}; +use axum::extract::State; +use axum::http::StatusCode; +use axum::routing::post; +use axum::Json; +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; +use std::sync::Arc; +use tokio::net::TcpListener; +use tokio::task::JoinHandle; +use tracing::{error, info}; + +#[derive(Clone)] +pub struct SharedState { + pub preimage_repository: Arc, + pub finalized_l1_repository: Arc, +} + +async fn start_http_server(addr: &str, state: SharedState) -> Result<()> { + let app = axum::Router::new() + .route("/get_preimage", post(get_preimage)) + .route("/get_latest_metadata", post(get_latest_metadata)) + .route("/list_metadata", post(list_metadata)) + .route("/get_finalized_l1", post(get_finalized_l1)) + .with_state(Arc::new(state)); + + let listener = TcpListener::bind(addr).await?; + tracing::info!("listening on {}", addr); + axum::serve(listener, app).await?; + Ok(()) +} + +pub fn start_http_server_task(addr: &str, state: SharedState) -> JoinHandle> { + let addr = addr.to_string(); + tokio::spawn(async move { + start_http_server(&addr, state) + .await + .context("http server error") + }) +} + +// handler +pub type GetPreimageRequest = PreimageMetadata; + +async fn get_preimage( + State(state): State>, + Json(payload): Json, +) -> (StatusCode, Vec) { + info!("request: get_preimage: {:?}", payload); + if let Err(v) = validate_get_preimage_request(&payload) { + return (StatusCode::BAD_REQUEST, v.as_bytes().to_vec()); + } + + let result = state.preimage_repository.get(&payload).await; + match result { + Ok(preimage) => (StatusCode::OK, preimage), + Err(e) => { + error!("failed to get preimage: {:?}", e); + (StatusCode::INTERNAL_SERVER_ERROR, vec![]) + } + } +} + +fn validate_get_preimage_request(payload: &GetPreimageRequest) -> Result<(), &'static str> { + if payload.l1_head.is_empty() || payload.l1_head.is_zero() { + error!("invalid l1_head",); + return Err("invalid l1_head"); + } + if payload.claimed == 0 { + error!("invalid l2_block_number",); + return Err("invalid claimed l2_block_number"); + } + if payload.agreed >= payload.claimed { + error!("invalid agreed l2_block_number",); + return Err("invalid agreed l2_block_number"); + } + Ok(()) +} + +async fn get_latest_metadata( + State(state): State>, +) -> (StatusCode, Json>) { + info!("request: get_latest_metadata"); + let result = state.preimage_repository.latest_metadata().await; + match result { + Some(metadata) => { + info!("latest metadata: {:?}", metadata); + (StatusCode::OK, Json(Some(metadata))) + } + None => { + error!("failed to get latest metadata",); + (StatusCode::NOT_FOUND, Json(None)) + } + } +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct ListMetadataRequest { + pub lt_claimed: u64, + pub gt_claimed: u64, +} + +async fn list_metadata( + State(state): State>, + Json(payload): Json, +) -> (StatusCode, Json>) { + info!("request: list_metadata: {:?}", payload); + if payload.gt_claimed == 0 { + error!("invalid gt_claimed",); + return (StatusCode::BAD_REQUEST, Json(vec![])); + } + if payload.lt_claimed == 0 { + error!("invalid lt_claimed",); + return (StatusCode::BAD_REQUEST, Json(vec![])); + } + if payload.lt_claimed <= payload.gt_claimed { + error!( + "invalid range: lt_claimed ({}) must be greater than gt_claimed ({})", + payload.lt_claimed, payload.gt_claimed, + ); + return (StatusCode::BAD_REQUEST, Json(vec![])); + } + + let result = state + .preimage_repository + .list_metadata(Some(payload.lt_claimed), Some(payload.gt_claimed)) + .await; + (StatusCode::OK, Json(result)) +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct GetFinalizedL1Request { + pub l1_head_hash: B256, +} + +async fn get_finalized_l1( + State(state): State>, + Json(payload): Json, +) -> (StatusCode, String) { + info!("request: get_finalized_l1: {:?}", payload); + let result = state + .finalized_l1_repository + .get(&payload.l1_head_hash) + .await; + match result { + Ok(v) => (StatusCode::OK, v), + Err(e) => { + error!( + "failed to get finalized l1: {:?}, hash:{:?}", + e, payload.l1_head_hash + ); + (StatusCode::NOT_FOUND, "".to_string()) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use axum::async_trait; + use std::sync::Mutex; + + struct MockPreimageRepository { + data: Arc>>, + should_fail: bool, + } + + #[async_trait] + impl PreimageRepository for MockPreimageRepository { + async fn upsert( + &self, + _metadata: PreimageMetadata, + _preimage: Vec, + ) -> anyhow::Result<()> { + Ok(()) + } + async fn get(&self, metadata: &PreimageMetadata) -> anyhow::Result> { + if self.should_fail { + return Err(anyhow::anyhow!("mock error")); + } + if metadata.claimed == 999 { + return Ok(vec![1, 2, 3]); + } + Ok(vec![]) + } + async fn list_metadata(&self, lt: Option, gt: Option) -> Vec { + let data = self.data.lock().unwrap(); + data.iter() + .filter(|m| { + if let Some(l) = lt { + if m.claimed >= l { + return false; + } + } + if let Some(g) = gt { + if m.claimed <= g { + return false; + } + } + true + }) + .cloned() + .collect() + } + async fn latest_metadata(&self) -> Option { + if self.should_fail { + return None; + } + self.data.lock().unwrap().last().cloned() + } + async fn purge_expired(&self) -> anyhow::Result<()> { + Ok(()) + } + } + + struct MockFinalizedL1Repository { + data: Arc>>, + } + + #[async_trait] + impl FinalizedL1Repository for MockFinalizedL1Repository { + async fn upsert( + &self, + l1_head_hash: &B256, + raw_finalized_l1: String, + ) -> anyhow::Result<()> { + self.data + .lock() + .unwrap() + .insert(*l1_head_hash, raw_finalized_l1); + Ok(()) + } + async fn get(&self, l1_head_hash: &B256) -> anyhow::Result { + self.data + .lock() + .unwrap() + .get(l1_head_hash) + .cloned() + .ok_or(anyhow::anyhow!("not found")) + } + async fn purge_expired(&self) -> anyhow::Result<()> { + Ok(()) + } + } + + fn setup_state() -> Arc { + let repo = MockPreimageRepository { + data: Arc::new(Mutex::new(vec![ + PreimageMetadata { + l1_head: B256::repeat_byte(1), + claimed: 100, + agreed: 90, + }, + PreimageMetadata { + l1_head: B256::repeat_byte(2), + claimed: 200, + agreed: 190, + }, + ])), + should_fail: false, + }; + let l1_repo = MockFinalizedL1Repository { + data: Arc::new(Mutex::new(std::collections::HashMap::new())), + }; + Arc::new(SharedState { + preimage_repository: Arc::new(repo), + finalized_l1_repository: Arc::new(l1_repo), + }) + } + + #[tokio::test] + async fn test_get_preimage_validation() { + let state = setup_state(); + + // Invalid l1_head + let req = GetPreimageRequest { + l1_head: B256::ZERO, + claimed: 100, + agreed: 90, + }; + let (status, _) = get_preimage(State(state.clone()), Json(req)).await; + assert_eq!(status, StatusCode::BAD_REQUEST); + + // Invalid claimed + let req = GetPreimageRequest { + l1_head: B256::repeat_byte(1), + claimed: 0, + agreed: 90, + }; + let (status, _) = get_preimage(State(state.clone()), Json(req)).await; + assert_eq!(status, StatusCode::BAD_REQUEST); + + // Invalid agreed >= claimed + let req = GetPreimageRequest { + l1_head: B256::repeat_byte(1), + claimed: 100, + agreed: 100, + }; + let (status, _) = get_preimage(State(state.clone()), Json(req)).await; + assert_eq!(status, StatusCode::BAD_REQUEST); + } + + #[tokio::test] + async fn test_get_preimage_success() { + let state = setup_state(); + let req = GetPreimageRequest { + l1_head: B256::repeat_byte(1), + claimed: 999, // Trigger mock success with data + agreed: 900, + }; + let (status, data) = get_preimage(State(state), Json(req)).await; + assert_eq!(status, StatusCode::OK); + assert_eq!(data, vec![1, 2, 3]); + } + + #[tokio::test] + async fn test_get_preimage_error() { + let repo = MockPreimageRepository { + data: Arc::new(Mutex::new(vec![])), + should_fail: true, + }; + let l1_repo = MockFinalizedL1Repository { + data: Arc::new(Mutex::new(std::collections::HashMap::new())), + }; + let state = Arc::new(SharedState { + preimage_repository: Arc::new(repo), + finalized_l1_repository: Arc::new(l1_repo), + }); + + let req = GetPreimageRequest { + l1_head: B256::repeat_byte(1), + claimed: 100, + agreed: 90, + }; + let (status, _) = get_preimage(State(state), Json(req)).await; + assert_eq!(status, StatusCode::INTERNAL_SERVER_ERROR); + } + + #[tokio::test] + async fn test_get_latest_metadata() { + let state = setup_state(); + let (status, Json(opt)) = get_latest_metadata(State(state)).await; + assert_eq!(status, StatusCode::OK); + assert!(opt.is_some()); + assert_eq!(opt.unwrap().claimed, 200); + } + + #[tokio::test] + async fn test_get_latest_metadata_empty() { + let repo = MockPreimageRepository { + data: Arc::new(Mutex::new(vec![])), + should_fail: true, // Mock behavior for None + }; + let l1_repo = MockFinalizedL1Repository { + data: Arc::new(Mutex::new(std::collections::HashMap::new())), + }; + let state = Arc::new(SharedState { + preimage_repository: Arc::new(repo), + finalized_l1_repository: Arc::new(l1_repo), + }); + + let (status, Json(opt)) = get_latest_metadata(State(state)).await; + assert_eq!(status, StatusCode::NOT_FOUND); + assert!(opt.is_none()); + } + + #[tokio::test] + async fn test_list_metadata_validation() { + let state = setup_state(); + + // lt_claimed <= gt_claimed + let req = ListMetadataRequest { + lt_claimed: 100, + gt_claimed: 100, + }; + let (status, _) = list_metadata(State(state.clone()), Json(req)).await; + assert_eq!(status, StatusCode::BAD_REQUEST); + + // zero gt_claimed + let req = ListMetadataRequest { + lt_claimed: 100, + gt_claimed: 0, + }; + let (status, _) = list_metadata(State(state.clone()), Json(req)).await; + assert_eq!(status, StatusCode::BAD_REQUEST); + } + + #[tokio::test] + async fn test_list_metadata_success() { + let state = setup_state(); + let req = ListMetadataRequest { + lt_claimed: 210, + gt_claimed: 90, + }; + let (status, Json(vec)) = list_metadata(State(state), Json(req)).await; + assert_eq!(status, StatusCode::OK); + assert_eq!(vec.len(), 2); + } + + #[tokio::test] + async fn test_get_finalized_l1() { + let state = setup_state(); + let hash = B256::repeat_byte(0x99); + state + .finalized_l1_repository + .upsert(&hash, "data".into()) + .await + .unwrap(); + + let req = GetFinalizedL1Request { l1_head_hash: hash }; + let (status, data) = get_finalized_l1(State(state), Json(req)).await; + assert_eq!(status, StatusCode::OK); + assert_eq!(data, "data"); + + let req_fail = GetFinalizedL1Request { + l1_head_hash: B256::ZERO, + }; + let (status, _) = get_finalized_l1(State(setup_state()), Json(req_fail)).await; + assert_eq!(status, StatusCode::NOT_FOUND); + } +} diff --git a/server/tests/e2e.rs b/server/tests/e2e.rs index 3eec951..644b6b0 100644 --- a/server/tests/e2e.rs +++ b/server/tests/e2e.rs @@ -1,82 +1,68 @@ -use alloy_primitives::ChainId; +//! These tests are required to run the preimage server. + use optimism_derivation::derivation::Derivation; use optimism_derivation::oracle::MemoryOracleClient; use optimism_derivation::types::Preimages; -use optimism_preimage_maker::l2_client::L2Client; -use optimism_preimage_maker::Request; +use optimism_preimage_maker::client::beacon_client::LightClientFinalityUpdateResponse; +use optimism_preimage_maker::client::l2_client::{HttpL2Client, L2Client}; +use optimism_preimage_maker::data::preimage_repository::PreimageMetadata; +use optimism_preimage_maker::web::{ + GetFinalizedL1Request, GetPreimageRequest, ListMetadataRequest, +}; use prost::Message; -use serial_test::serial; use std::env; use tracing_subscriber::filter; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; -fn init() { +pub fn init() { let filter = filter::EnvFilter::from_default_env().add_directive("info".parse().unwrap()); let _ = tracing_subscriber::registry() .with(tracing_subscriber::fmt::layer()) .with(filter) .try_init(); } -fn get_l2_client() -> L2Client { - let op_node_addr = format!("http://localhost:{}", env::var("L2_ROLLUP_PORT").unwrap()); - let op_geth_addr = format!("http://localhost:{}", env::var("L2_GETH_PORT").unwrap()); + +pub fn get_l2_client() -> HttpL2Client { + let op_node_addr = env::var("L2_ROLLUP_ADDR").unwrap(); + let op_geth_addr = env::var("L2_GETH_ADDR").unwrap(); tracing::info!( "Starting with op_node_addr: {} op_geth_addr: {}", op_node_addr, op_geth_addr ); - L2Client::new(op_node_addr, op_geth_addr) + HttpL2Client::new( + op_node_addr, + op_geth_addr, + std::time::Duration::from_secs(30), + ) } -async fn get_latest_derivation(l2_client: &L2Client) -> Request { - const BEHIND: u64 = 10; - const L2_COUNT: u64 = 20; - let sync_status = l2_client.sync_status().await.unwrap(); - let finalized_l2 = sync_status.finalized_l2.number; - let claiming_l2_number = finalized_l2 - BEHIND; - let agreed_l2_number = claiming_l2_number - L2_COUNT; - let claiming_output = l2_client.output_root_at(claiming_l2_number).await.unwrap(); - let agreed_l2_hash = l2_client - .get_block_by_number(agreed_l2_number) +pub async fn derivation_in_light_client( + l2_client: &HttpL2Client, + preimages: Preimages, + metadata: PreimageMetadata, +) { + let agreed_l2_output_root = l2_client + .output_root_at(metadata.agreed) .await .unwrap() - .hash; - let agreed_output = l2_client.output_root_at(agreed_l2_number).await.unwrap(); - tracing::info!( - "claimed_output: l1_origin={:?} l1={:?}", - claiming_output.block_ref.l1_origin.number, - sync_status.finalized_l1.number - ); - - Request { - l1_head_hash: sync_status.finalized_l1.hash, - agreed_l2_head_hash: agreed_l2_hash, - agreed_l2_output_root: agreed_output.output_root, - l2_output_root: claiming_output.output_root, - l2_block_number: claiming_l2_number, - } -} - -async fn success_derivation(request: Request, chain_id: ChainId) { - tracing::info!("request: {:?}", request); - - let client = reqwest::Client::new(); - let builder = client.post("http://localhost:10080/derivation"); - let preimage_bytes = builder.json(&request).send().await.unwrap(); - assert_eq!(preimage_bytes.status(), 200); - - let preimage_bytes = preimage_bytes.bytes().await.unwrap(); + .output_root; + let claimed_l2_output_root = l2_client + .output_root_at(metadata.claimed) + .await + .unwrap() + .output_root; + let chain_id = l2_client.chain_id().await.unwrap(); - tracing::info!("start derivation "); - let preimages = Preimages::decode(preimage_bytes).unwrap(); let oracle = MemoryOracleClient::try_from(preimages.preimages).unwrap(); let derivation = Derivation::new( - request.l1_head_hash, - request.agreed_l2_output_root, - request.l2_output_root, - request.l2_block_number, + metadata.l1_head, + agreed_l2_output_root, + claimed_l2_output_root, + metadata.claimed, ); + tracing::info!("start derivation {:?}", derivation); let result = derivation.verify(chain_id, oracle); match result { @@ -88,49 +74,72 @@ async fn success_derivation(request: Request, chain_id: ChainId) { } } -#[serial] #[tokio::test(flavor = "multi_thread")] -async fn test_make_preimages_success() { +pub async fn test_derivation_success() { init(); let l2_client = get_l2_client(); - let chain_id = l2_client.chain_id().await.unwrap(); - let request = get_latest_derivation(&l2_client).await; - success_derivation(request, chain_id).await; -} -#[serial] -#[tokio::test(flavor = "multi_thread")] -#[ignore] -async fn test_make_preimages_success_from_file() { - init(); - let path = env::var("REQUEST_PATH").unwrap(); - tracing::info!("reading request from file: {path}"); - let request = std::fs::read_to_string(path).unwrap(); - let request = serde_json::from_str::(&request).unwrap(); - let l2_client = get_l2_client(); - let chain_id = l2_client.chain_id().await.unwrap(); - success_derivation(request, chain_id).await; -} + let client = reqwest::Client::new(); + let root_path = "http://localhost:10080"; + let latest_metadata: PreimageMetadata = client + .post(format!("{root_path}/get_latest_metadata")) + .send() + .await + .unwrap() + .json() + .await + .unwrap(); + let metadata_list: Vec = client + .post(format!("{root_path}/list_metadata")) + .json(&ListMetadataRequest { + lt_claimed: latest_metadata.claimed + 1, + gt_claimed: 100, + }) + .send() + .await + .unwrap() + .json() + .await + .unwrap(); + assert!(!metadata_list.is_empty(), "No metadata found"); -#[serial] -#[tokio::test(flavor = "multi_thread")] -async fn test_make_preimages_error() { - init(); - let l2_client = get_l2_client(); + for metadata in metadata_list { + assert!(metadata.claimed > 100); + assert!(metadata.claimed < latest_metadata.claimed + 1); - let mut request = get_latest_derivation(&l2_client).await; + // Assert finalized l1 + let finalized_l1: LightClientFinalityUpdateResponse = client + .post(format!("{root_path}/get_finalized_l1")) + .json(&GetFinalizedL1Request { + l1_head_hash: metadata.l1_head, + }) + .send() + .await + .unwrap() + .json() + .await + .unwrap(); + assert_eq!( + finalized_l1.data.finalized_header.execution.block_hash, + metadata.l1_head + ); - // invalid l2_output_root - request.l2_output_root = request.agreed_l2_output_root; - tracing::info!("request: {:?}", request); + // Get preimage + let preimage_bytes = client + .post(format!("{root_path}/get_preimage")) + .json(&GetPreimageRequest { + agreed: metadata.agreed, + claimed: metadata.claimed, + l1_head: metadata.l1_head, + }) + .send() + .await + .unwrap() + .bytes() + .await + .unwrap(); - let client = reqwest::Client::new(); - let builder = client.post("http://localhost:10080/derivation"); - let result = builder.json(&request).send().await.unwrap(); - assert_eq!( - result.status(), - 400, - "Derivation should fail {}", - result.status() - ); + let preimages = Preimages::decode(preimage_bytes).unwrap(); + derivation_in_light_client(&l2_client, preimages, metadata).await; + } } diff --git a/server/tests/inspect.rs b/server/tests/inspect.rs new file mode 100644 index 0000000..353949c --- /dev/null +++ b/server/tests/inspect.rs @@ -0,0 +1,116 @@ +use optimism_derivation::types::Preimages; +use optimism_preimage_maker::derivation::host::single::config::Config; +use optimism_preimage_maker::derivation::host::single::handler::{ + Derivation, DerivationConfig, DerivationRequest, +}; +use prost::Message; +use serial_test::serial; +use std::env; +use std::sync::Arc; +use tracing::info; + +mod e2e; +use crate::e2e::derivation_in_light_client; +use e2e::get_l2_client; +use e2e::init; +use optimism_preimage_maker::client::l2_client::{Block, L2Client, RpcRequest, RpcResult}; +use optimism_preimage_maker::data::preimage_repository::PreimageMetadata; + +async fn get_block_by_number(number: u64, l1_geth_addr: &str) -> anyhow::Result { + let client = reqwest::Client::new(); + let body = RpcRequest { + method: "eth_getBlockByNumber".into(), + params: vec![format!("0x{number:X}").into(), false.into()], + ..Default::default() + }; + let response = client + .post(l1_geth_addr) + .header("Content-Type", "application/json") + .json(&body) + .send() + .await?; + if !response.status().is_success() { + return Err(anyhow::anyhow!( + "failed to get block by number: {response:?}" + )); + } + let result: RpcResult = response.json().await?; + Ok(result.result) +} + +/* +ex) +export L2_ROLLUP_ADDR=http://localhost:9545 +export L2_GETH_ADDR=http://localhost:8546 +export L1_GETH_ADDR=http://localhost:8545 +export L1_BEACON_ADDR=http://localhost:9596 +export CLAIMED=104 +export AGREED=103 +*/ +#[serial] +#[tokio::test(flavor = "multi_thread")] +#[ignore] +async fn test_derivation() { + init(); + let claimed: u64 = env::var("CLAIMED").unwrap().parse().unwrap(); + let agreed: u64 = env::var("AGREED").unwrap().parse().unwrap(); + let l2_client = get_l2_client(); + let chain_id = l2_client.chain_id().await.unwrap(); + + let op_geth_addr = env::var("L2_GETH_ADDR").unwrap(); + let op_node_addr = env::var("L2_ROLLUP_ADDR").unwrap(); + let l1_geth_addr = env::var("L1_GETH_ADDR").unwrap(); + let l1_beacon_addr = env::var("L1_BEACON_ADDR").unwrap(); + + let claimed_output = l2_client.output_root_at(claimed).await.unwrap(); + let agreed_output = l2_client.output_root_at(agreed).await.unwrap(); + let l1_hash = get_block_by_number( + claimed_output.block_ref.l1_origin.number + 50, + &l1_geth_addr, + ) + .await + .unwrap() + .hash; + let request = DerivationRequest { + l1_head_hash: l1_hash, + agreed_l2_head_hash: agreed_output.block_ref.hash, + agreed_l2_output_root: agreed_output.output_root, + l2_output_root: claimed_output.output_root, + l2_block_number: claimed, + }; + + let config = Arc::new(DerivationConfig { + config: Config { + l2_node_address: op_geth_addr.to_string(), + l1_node_address: l1_geth_addr.to_string(), + l1_beacon_address: l1_beacon_addr.to_string(), + l2_rollup_address: op_node_addr.to_string(), + ..Default::default() + }, + rollup_config: None, + l2_chain_id: chain_id, + l1_chain_config: None, + }); + + info!("start derivation in preimage maker: {:?}", &request); + let derivation = Derivation { config, request }; + let result = derivation.start().await; + let preimage = match result { + Ok(preimage) => { + info!("derivation success"); + Preimages::decode(preimage.as_slice()).unwrap() + } + Err(e) => panic!("derivation failed: {e:?}"), + }; + + derivation_in_light_client( + &l2_client, + preimage, + PreimageMetadata { + agreed, + claimed, + l1_head: l1_hash, + }, + ) + .await; +} diff --git a/tool/derive.sh b/tool/derive.sh deleted file mode 100644 index 0e57ee4..0000000 --- a/tool/derive.sh +++ /dev/null @@ -1 +0,0 @@ -curl -X POST http://localhost:10080/derivation -H "Content-Type: application/json" -d @body.json \ No newline at end of file diff --git a/tool/output.sh b/tool/output.sh deleted file mode 100644 index 3b49d01..0000000 --- a/tool/output.sh +++ /dev/null @@ -1,44 +0,0 @@ -agreed=$1 -claimed=$2 -agreed_hex=$(printf "0x%x" "$agreed") -claimed_hex=$(printf "0x%x" "$claimed") - -# agreed -AGREED=$(curl -s -X POST http://localhost:9545 \ - -H "Content-Type: application/json" \ - -d "{ - \"jsonrpc\":\"2.0\", - \"method\":\"optimism_outputAtBlock\", - \"params\":[\"$agreed_hex\"], - \"id\":2 - }") -AGREED_L2_HASH=$(echo $AGREED | jq .result.blockRef.hash) -echo "agreed l2 hash: $AGREED_L2_HASH" -AGREED_L2_OUTPUT=$(echo $AGREED | jq .result.outputRoot) -echo "agreed l2 output root: $AGREED_L2_OUTPUT" - -# claimed -CLAIMED=$(curl -s -X POST http://localhost:9545 \ - -H "Content-Type: application/json" \ - -d "{ - \"jsonrpc\":\"2.0\", - \"method\":\"optimism_outputAtBlock\", - \"params\":[\"$claimed_hex\"], - \"id\":2 - }") -CLAIMED_L2_OUTPUT=$(echo $CLAIMED | jq .result.outputRoot) -echo "claimed l2 output root: $CLAIMED_L2_OUTPUT" - -CLAIMED_L1_ORIGIN=$(echo $CLAIMED | jq .result.blockRef.l1origin.number) -L1_HEAD_NUMBER=$((CLAIMED_L1_ORIGIN+30)) -echo "l1 head number: ${L1_HEAD_NUMBER}" -l1_head_num_hex=$(printf "0x%x" "$L1_HEAD_NUMBER") - -L1_HEAD_HASH=$(curl -s -X POST localhost:8545 -d "{\"method\":\"eth_getBlockByNumber\", \"jsonrpc\": \"2.0\", \"id\":1, \"params\":[\"${l1_head_num_hex}\",false]}" -H "Content-Type: application/json" | jq .result.hash) -echo "l1_head_hash: ${L1_HEAD_HASH}" - -echo "{ \"l1_head_hash\": ${L1_HEAD_HASH}, " > body.json -echo " \"agreed_l2_head_hash\": ${AGREED_L2_HASH}, " >> body.json -echo " \"agreed_l2_output_root\": ${AGREED_L2_OUTPUT}, " >> body.json -echo " \"l2_output_root\": ${CLAIMED_L2_OUTPUT}, " >> body.json -echo " \"l2_block_number\": ${claimed}} " >> body.json