From ac6af45c7f07fc52ab98bfd072a2aa86bd97d57e Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Wed, 18 Feb 2026 19:29:14 -0600 Subject: [PATCH 1/4] wip --- Cargo.lock | 3 + crates/checkpointer/Cargo.toml | 3 + crates/checkpointer/src/lib.rs | 265 ++++++++++++++++++++++++++++++++ crates/checkpointer/src/main.rs | 16 ++ 4 files changed, 287 insertions(+) create mode 100644 crates/checkpointer/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 637dae3a..e7bb87dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -938,7 +938,10 @@ dependencies = [ "clap", "data-generation", "etl", + "object_store", "parquet", + "serde", + "serde_json", "system-adapter-protocol", "tokio", "tokio-util", diff --git a/crates/checkpointer/Cargo.toml b/crates/checkpointer/Cargo.toml index 4f533dd5..ebf44ab6 100644 --- a/crates/checkpointer/Cargo.toml +++ b/crates/checkpointer/Cargo.toml @@ -16,6 +16,9 @@ chrono.workspace = true clap = { workspace = true, features = ["derive"] } data-generation = { path = "../data-generation" } etl = { path = "../etl", features = ["duckdb"] } +object_store = { workspace = true } +serde = { workspace = true, features = ["derive"] } +serde_json.workspace = true system-adapter-protocol = { path = "../system-adapter-protocol" } parquet.workspace = true tokio.workspace = true diff --git a/crates/checkpointer/src/lib.rs b/crates/checkpointer/src/lib.rs new file mode 100644 index 00000000..a8878e3b --- /dev/null +++ b/crates/checkpointer/src/lib.rs @@ -0,0 +1,265 @@ +/* +Copyright 2024-2025 The Spice.ai OSS Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +//! Checkpoint object store — upload / download checkpoint artefacts to S3. +//! +//! ## Layout +//! +//! ```text +//! s3://{bucket}/{prefix}/{scenario}/checkpoints/{checkpoint_idx}/{query_idx}.parquet +//! s3://{bucket}/{prefix}/checkpoints.json ← manifest +//! ``` +//! +//! The manifest (`checkpoints.json`) contains metadata for every scenario +//! that has been checkpointed under the given prefix. + +use std::collections::HashMap; +use std::path::Path; +use std::sync::Arc; + +use object_store::aws::AmazonS3Builder; +use object_store::path::Path as ObjectPath; +use object_store::{ObjectStore, PutPayload}; +use serde::{Deserialize, Serialize}; + +/// Top-level manifest persisted as `{prefix}/checkpoints.json`. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct CheckpointManifest { + /// Map from scenario name to its metadata. + pub scenarios: HashMap, +} + +/// Per-scenario metadata stored inside the manifest. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ScenarioCheckpoint { + /// Total number of checkpoint snapshots stored for this scenario. + pub num_checkpoints: usize, + /// Number of query results stored in each checkpoint snapshot. + pub num_queries: usize, +} + +/// S3‑backed store for uploading and downloading checkpoint artefacts. +pub struct CheckpointStore { + store: Arc, + #[allow(dead_code)] + bucket: String, + prefix: String, +} + +impl CheckpointStore { + /// Build a new `CheckpointStore` from raw S3 connection parameters. + /// + /// `prefix` may be empty; it is the key‑prefix shared by all scenarios + /// (e.g. `"run-42"`). + pub fn new( + bucket: &str, + prefix: &str, + region: Option<&str>, + endpoint: Option<&str>, + ) -> anyhow::Result { + let mut builder = AmazonS3Builder::from_env().with_bucket_name(bucket); + + if let Some(region) = region { + builder = builder.with_region(region); + } + if let Some(endpoint) = endpoint + && !endpoint.is_empty() + { + builder = builder.with_endpoint(endpoint); + if endpoint.starts_with("http://") { + builder = builder.with_allow_http(true); + } + } + + let store = Arc::new(builder.build()?); + Ok(Self { + store, + bucket: bucket.to_owned(), + prefix: prefix.to_owned(), + }) + } + + fn object_path(&self, suffix: &str) -> ObjectPath { + if self.prefix.is_empty() { + ObjectPath::from(suffix.to_owned()) + } else { + ObjectPath::from(format!("{}/{suffix}", self.prefix)) + } + } + + fn manifest_path(&self) -> ObjectPath { + self.object_path("checkpoints.json") + } + + fn checkpoint_parquet_path( + &self, + scenario: &str, + checkpoint_idx: usize, + query_idx: usize, + ) -> ObjectPath { + self.object_path(&format!( + "{scenario}/checkpoints/{checkpoint_idx}/{query_idx}.parquet" + )) + } + + /// Upload all checkpoint parquet files from `local_checkpoint_dir` to S3, + /// then update (merge into) the manifest at `{prefix}/checkpoints.json`. + /// + /// The local directory is expected to have the layout produced by the + /// checkpointer binary: + /// + /// ```text + /// {local_checkpoint_dir}/ + /// 0/ + /// 0.parquet + /// 1.parquet + /// 1/ + /// 0.parquet + /// ... + /// ``` + pub async fn upload_checkpoints( + &self, + scenario: &str, + local_checkpoint_dir: &Path, + ) -> anyhow::Result<()> { + if !local_checkpoint_dir.is_dir() { + anyhow::bail!( + "Checkpoint directory does not exist: {}", + local_checkpoint_dir.display() + ); + } + + let mut num_checkpoints: usize = 0; + let mut num_queries: usize = 0; + + // Iterate over checkpoint index directories (0, 1, 2, …). + let mut checkpoint_dirs: Vec<_> = std::fs::read_dir(local_checkpoint_dir)? + .filter_map(Result::ok) + .filter(|e| e.path().is_dir()) + .collect(); + checkpoint_dirs.sort_by_key(|e| e.file_name()); + + for checkpoint_entry in &checkpoint_dirs { + let checkpoint_idx: usize = checkpoint_entry + .file_name() + .to_string_lossy() + .parse() + .unwrap_or(0); + + let mut query_files: Vec<_> = std::fs::read_dir(checkpoint_entry.path())? + .filter_map(Result::ok) + .filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet")) + .collect(); + query_files.sort_by_key(|e| e.file_name()); + + for qf in &query_files { + let q_idx: usize = qf + .path() + .file_stem() + .and_then(|s| s.to_str()) + .and_then(|s| s.parse().ok()) + .unwrap_or(0); + + let bytes = std::fs::read(qf.path())?; + let dest = self.checkpoint_parquet_path(scenario, checkpoint_idx, q_idx); + + tracing::info!( + scenario, + checkpoint = checkpoint_idx, + query = q_idx, + dest = %dest, + "Uploading checkpoint parquet" + ); + self.store.put(&dest, PutPayload::from(bytes)).await?; + + if q_idx >= num_queries { + num_queries = q_idx + 1; + } + } + + num_checkpoints = checkpoint_idx + 1; + } + + // Merge into manifest. + let mut manifest = self.download_manifest().await.unwrap_or_default(); + manifest.scenarios.insert( + scenario.to_owned(), + ScenarioCheckpoint { + num_checkpoints, + num_queries, + }, + ); + self.put_manifest(&manifest).await?; + + tracing::info!( + scenario, + num_checkpoints, + num_queries, + "Checkpoint upload complete" + ); + Ok(()) + } + + /// Upload (overwrite) the manifest JSON. + async fn put_manifest(&self, manifest: &CheckpointManifest) -> anyhow::Result<()> { + let json = serde_json::to_vec_pretty(manifest)?; + self.store + .put(&self.manifest_path(), PutPayload::from(json)) + .await?; + Ok(()) + } + + /// Download and deserialise the manifest from S3. + /// + /// Returns `Ok(manifest)` or an error if the manifest does not exist or + /// cannot be parsed. + pub async fn download_manifest(&self) -> anyhow::Result { + let data = self.store.get(&self.manifest_path()).await?.bytes().await?; + let manifest: CheckpointManifest = serde_json::from_slice(&data)?; + Ok(manifest) + } + + /// Download all checkpoint parquet files for `scenario` into + /// `local_dir/{checkpoint_idx}/{query_idx}.parquet`. + pub async fn download_checkpoints( + &self, + scenario: &str, + info: &ScenarioCheckpoint, + local_dir: &Path, + ) -> anyhow::Result<()> { + for checkpoint_idx in 0..info.num_checkpoints { + let checkpoint_dir = local_dir.join(checkpoint_idx.to_string()); + std::fs::create_dir_all(&checkpoint_dir)?; + + for q_idx in 0..info.num_queries { + let remote = self.checkpoint_parquet_path(scenario, checkpoint_idx, q_idx); + let data = self.store.get(&remote).await?.bytes().await?; + let local_path = checkpoint_dir.join(format!("{q_idx}.parquet")); + std::fs::write(&local_path, &data)?; + + tracing::info!( + scenario, + checkpoint = checkpoint_idx, + query = q_idx, + path = %local_path.display(), + "Downloaded checkpoint parquet" + ); + } + } + + Ok(()) + } +} diff --git a/crates/checkpointer/src/main.rs b/crates/checkpointer/src/main.rs index 826523bd..9f6d7757 100644 --- a/crates/checkpointer/src/main.rs +++ b/crates/checkpointer/src/main.rs @@ -19,6 +19,7 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use arrow::array::RecordBatch; +use checkpointer::CheckpointStore; use clap::Parser; use data_generation::config::{DatasetConfig, TargetConfig}; use data_generation::dataset::MutationConfig; @@ -28,6 +29,9 @@ use etl::{DatasetSource, ETLPipeline, PipelineState, StopReason}; use parquet::arrow::ArrowWriter; use tracing_subscriber::EnvFilter; +/// Static scenario name used until we derive it from the scenario configuration. +const SCENARIO_NAME: &str = "default"; + /// Static list of checkpoint queries to run against the DuckDB database at /// each checkpoint boundary. const CHECKPOINT_QUERIES: &[&str] = &[ @@ -231,6 +235,18 @@ async fn main() -> anyhow::Result<()> { "Pipeline completed, running final checkpoint queries" ); run_checkpoint_queries(&target, &cli.checkpoint_dir, checkpoint_idx).await?; + + // Upload all checkpoints to S3. + let checkpoint_store = CheckpointStore::new( + &cli.bucket, + &cli.source_prefix, + cli.region.as_deref(), + cli.endpoint.as_deref(), + )?; + checkpoint_store + .upload_checkpoints(SCENARIO_NAME, &cli.checkpoint_dir) + .await?; + tracing::info!("Checkpointer completed successfully"); break; } From b1b04109132e3ae3b6b982a5fefbb22dd42da2d4 Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Wed, 18 Feb 2026 19:39:32 -0600 Subject: [PATCH 2/4] feat: Connect checkpointer store to spicebench --- Cargo.lock | 2 ++ Cargo.toml | 2 ++ src/main.rs | 35 +++++++++++++++++++++++++++++++++++ 3 files changed, 39 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index e7bb87dc..af1ffddb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4953,6 +4953,7 @@ dependencies = [ "arrow", "arrow-schema", "async-trait", + "checkpointer", "clap", "data-generation", "etl", @@ -4960,6 +4961,7 @@ dependencies = [ "serde", "serde_json", "system-adapter-protocol", + "tempfile", "test-framework", "tokio", "tokio-util", diff --git a/Cargo.toml b/Cargo.toml index 2d1b13f7..94a7f93d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -138,6 +138,7 @@ adbc_client = { path = "crates/adbc_client" } arrow.workspace = true arrow-schema.workspace = true async-trait.workspace = true +checkpointer = { path = "crates/checkpointer" } clap.workspace = true data-generation = { path = "crates/data-generation" } etl = { path = "crates/etl" } @@ -145,6 +146,7 @@ reqwest.workspace = true serde.workspace = true serde_json.workspace = true system-adapter-protocol = { path = "crates/system-adapter-protocol" } +tempfile.workspace = true test-framework = { path = "crates/test-framework" } tokio.workspace = true tokio-util.workspace = true diff --git a/src/main.rs b/src/main.rs index 60d71539..a13c50c6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,6 +18,7 @@ use std::{collections::HashMap, sync::Arc}; use adbc_client::AdbcConnection; use arrow_schema::{DataType, Field, Schema, TimeUnit}; +use checkpointer::CheckpointStore; use clap::Parser; use data_generation::config::{DatasetConfig as GenerationDatasetConfig, TargetConfig}; use data_generation::dataset::Dataset; @@ -145,6 +146,40 @@ async fn main() -> anyhow::Result<()> { } }; + // --- Download checkpoints from S3 --- + let scenario_name = cli.common.scenario.to_string(); + let checkpoint_dir = tempfile::tempdir()?; + + let checkpoint_store = CheckpointStore::new( + &cli.common.etl_bucket, + &cli.common.etl_source_prefix, + cli.common.etl_region.as_deref(), + cli.common.etl_endpoint.as_deref(), + )?; + + let manifest = checkpoint_store.download_manifest().await?; + if let Some(scenario_info) = manifest.scenarios.get(&scenario_name) { + tracing::info!( + scenario = %scenario_name, + num_checkpoints = scenario_info.num_checkpoints, + num_queries = scenario_info.num_queries, + path = %checkpoint_dir.path().display(), + "Downloading checkpoints" + ); + if let Err(e) = checkpoint_store + .download_checkpoints(&scenario_name, scenario_info, checkpoint_dir.path()) + .await { + tracing::warn!("Failed to download checkpoints - results validation will not be enabled: {e}"); + } else { + tracing::info!(scenario = %scenario_name, "Checkpoints downloaded"); + } + } else { + tracing::warn!( + scenario = %scenario_name, + "No checkpoints found for scenario in manifest" + ); + } + let driver_name = adbc_driver.driver.to_string(); let sink_kwargs = adbc_driver.db_kwargs.clone(); let load_kwargs = adbc_driver.db_kwargs; From 67de4fdb10fa475a0ac5cae73c35e3b3c9b3f7cd Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Wed, 18 Feb 2026 17:43:36 -0800 Subject: [PATCH 3/4] chore: auto-fix cargo fmt + clippy --- src/main.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/main.rs b/src/main.rs index a13c50c6..d38c3204 100644 --- a/src/main.rs +++ b/src/main.rs @@ -168,11 +168,14 @@ async fn main() -> anyhow::Result<()> { ); if let Err(e) = checkpoint_store .download_checkpoints(&scenario_name, scenario_info, checkpoint_dir.path()) - .await { - tracing::warn!("Failed to download checkpoints - results validation will not be enabled: {e}"); - } else { - tracing::info!(scenario = %scenario_name, "Checkpoints downloaded"); - } + .await + { + tracing::warn!( + "Failed to download checkpoints - results validation will not be enabled: {e}" + ); + } else { + tracing::info!(scenario = %scenario_name, "Checkpoints downloaded"); + } } else { tracing::warn!( scenario = %scenario_name, From f50fae92e0b70ef2f524642be20c34c87b67c350 Mon Sep 17 00:00:00 2001 From: William <98815791+peasee@users.noreply.github.com> Date: Wed, 18 Feb 2026 19:45:00 -0600 Subject: [PATCH 4/4] Update lib.rs