Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,15 @@ adbc_client = { path = "crates/adbc_client" }
arrow.workspace = true
arrow-schema.workspace = true
async-trait.workspace = true
checkpointer = { path = "crates/checkpointer" }
clap.workspace = true
data-generation = { path = "crates/data-generation" }
etl = { path = "crates/etl" }
reqwest.workspace = true
serde.workspace = true
serde_json.workspace = true
system-adapter-protocol = { path = "crates/system-adapter-protocol" }
tempfile.workspace = true
test-framework = { path = "crates/test-framework" }
tokio.workspace = true
tokio-util.workspace = true
Expand Down
3 changes: 3 additions & 0 deletions crates/checkpointer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ chrono.workspace = true
clap = { workspace = true, features = ["derive"] }
data-generation = { path = "../data-generation" }
etl = { path = "../etl", features = ["duckdb"] }
object_store = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
system-adapter-protocol = { path = "../system-adapter-protocol" }
parquet.workspace = true
tokio.workspace = true
Expand Down
265 changes: 265 additions & 0 deletions crates/checkpointer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
/*
Copyright 2024-2025 The Spice.ai OSS Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

//! Checkpoint object store — upload / download checkpoint artefacts to S3.
//!
//! ## Layout
//!
//! ```text
//! s3://{bucket}/{prefix}/{scenario}/checkpoints/{checkpoint_idx}/{query_idx}.parquet
//! s3://{bucket}/{prefix}/checkpoints.json ← manifest
//! ```
//!
//! The manifest (`checkpoints.json`) contains metadata for every scenario
//! that has been checkpointed under the given prefix.

use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;

use object_store::aws::AmazonS3Builder;
use object_store::path::Path as ObjectPath;
use object_store::{ObjectStore, PutPayload};
use serde::{Deserialize, Serialize};

/// Top-level manifest persisted as `{prefix}/checkpoints.json`.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct CheckpointManifest {
/// Map from scenario name to its metadata.
pub scenarios: HashMap<String, ScenarioCheckpoint>,
}

/// Per-scenario metadata stored inside the manifest.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScenarioCheckpoint {
/// Total number of checkpoint snapshots stored for this scenario.
pub num_checkpoints: usize,
/// Number of query results stored in each checkpoint snapshot.
pub num_queries: usize,
}

/// S3‑backed store for uploading and downloading checkpoint artefacts.
pub struct CheckpointStore {
store: Arc<dyn ObjectStore>,
#[allow(dead_code)]
bucket: String,
prefix: String,
}

impl CheckpointStore {
/// Build a new `CheckpointStore` from raw S3 connection parameters.
///
/// `prefix` may be empty; it is the key‑prefix shared by all scenarios
/// (e.g. `"run-42"`).
pub fn new(
bucket: &str,
prefix: &str,
region: Option<&str>,
endpoint: Option<&str>,
) -> anyhow::Result<Self> {
let mut builder = AmazonS3Builder::from_env().with_bucket_name(bucket);

if let Some(region) = region {
builder = builder.with_region(region);
}
if let Some(endpoint) = endpoint
&& !endpoint.is_empty()
{
builder = builder.with_endpoint(endpoint);
if endpoint.starts_with("http://") {
builder = builder.with_allow_http(true);
}
}

let store = Arc::new(builder.build()?);
Ok(Self {
store,
bucket: bucket.to_owned(),
prefix: prefix.to_owned(),
})
}

fn object_path(&self, suffix: &str) -> ObjectPath {
if self.prefix.is_empty() {
ObjectPath::from(suffix.to_owned())
} else {
ObjectPath::from(format!("{}/{suffix}", self.prefix))
}
}

fn manifest_path(&self) -> ObjectPath {
self.object_path("checkpoints.json")
}

fn checkpoint_parquet_path(
&self,
scenario: &str,
checkpoint_idx: usize,
query_idx: usize,
) -> ObjectPath {
self.object_path(&format!(
"{scenario}/checkpoints/{checkpoint_idx}/{query_idx}.parquet"
))
}

/// Upload all checkpoint parquet files from `local_checkpoint_dir` to S3,
/// then update (merge into) the manifest at `{prefix}/checkpoints.json`.
///
/// The local directory is expected to have the layout produced by the
/// checkpointer binary:
///
/// ```text
/// {local_checkpoint_dir}/
/// 0/
/// 0.parquet
/// 1.parquet
/// 1/
/// 0.parquet
/// ...
/// ```
pub async fn upload_checkpoints(
&self,
scenario: &str,
local_checkpoint_dir: &Path,
) -> anyhow::Result<()> {
if !local_checkpoint_dir.is_dir() {
anyhow::bail!(
"Checkpoint directory does not exist: {}",
local_checkpoint_dir.display()
);
}

let mut num_checkpoints: usize = 0;
let mut num_queries: usize = 0;

// Iterate over checkpoint index directories (0, 1, 2, …).
let mut checkpoint_dirs: Vec<_> = std::fs::read_dir(local_checkpoint_dir)?
.filter_map(Result::ok)
.filter(|e| e.path().is_dir())
.collect();
checkpoint_dirs.sort_by_key(|e| e.file_name());

for checkpoint_entry in &checkpoint_dirs {
let checkpoint_idx: usize = checkpoint_entry
.file_name()
.to_string_lossy()
.parse()
.unwrap_or(0);

let mut query_files: Vec<_> = std::fs::read_dir(checkpoint_entry.path())?
.filter_map(Result::ok)
.filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
.collect();
query_files.sort_by_key(|e| e.file_name());

for qf in &query_files {
let q_idx: usize = qf
.path()
.file_stem()
.and_then(|s| s.to_str())
.and_then(|s| s.parse().ok())
.unwrap_or(0);

let bytes = std::fs::read(qf.path())?;
let dest = self.checkpoint_parquet_path(scenario, checkpoint_idx, q_idx);

tracing::info!(
scenario,
checkpoint = checkpoint_idx,
query = q_idx,
dest = %dest,
"Uploading checkpoint parquet"
);
self.store.put(&dest, PutPayload::from(bytes)).await?;

if q_idx >= num_queries {
num_queries = q_idx + 1;
}
}

num_checkpoints = checkpoint_idx + 1;
}

// Merge into manifest.
let mut manifest = self.download_manifest().await.unwrap_or_default();
manifest.scenarios.insert(
scenario.to_owned(),
ScenarioCheckpoint {
num_checkpoints,
num_queries,
},
);
self.put_manifest(&manifest).await?;

tracing::info!(
scenario,
num_checkpoints,
num_queries,
"Checkpoint upload complete"
);
Ok(())
}

/// Upload (overwrite) the manifest JSON.
async fn put_manifest(&self, manifest: &CheckpointManifest) -> anyhow::Result<()> {
let json = serde_json::to_vec_pretty(manifest)?;
self.store
.put(&self.manifest_path(), PutPayload::from(json))
.await?;
Ok(())
}

/// Download and deserialise the manifest from S3.
///
/// Returns `Ok(manifest)` or an error if the manifest does not exist or
/// cannot be parsed.
pub async fn download_manifest(&self) -> anyhow::Result<CheckpointManifest> {
let data = self.store.get(&self.manifest_path()).await?.bytes().await?;
let manifest: CheckpointManifest = serde_json::from_slice(&data)?;
Ok(manifest)
}

/// Download all checkpoint parquet files for `scenario` into
/// `local_dir/{checkpoint_idx}/{query_idx}.parquet`.
pub async fn download_checkpoints(
&self,
scenario: &str,
info: &ScenarioCheckpoint,
local_dir: &Path,
) -> anyhow::Result<()> {
for checkpoint_idx in 0..info.num_checkpoints {
let checkpoint_dir = local_dir.join(checkpoint_idx.to_string());
std::fs::create_dir_all(&checkpoint_dir)?;

for q_idx in 0..info.num_queries {
let remote = self.checkpoint_parquet_path(scenario, checkpoint_idx, q_idx);
let data = self.store.get(&remote).await?.bytes().await?;
let local_path = checkpoint_dir.join(format!("{q_idx}.parquet"));
std::fs::write(&local_path, &data)?;

tracing::info!(
scenario,
checkpoint = checkpoint_idx,
query = q_idx,
path = %local_path.display(),
"Downloaded checkpoint parquet"
);
}
}

Ok(())
}
}
16 changes: 16 additions & 0 deletions crates/checkpointer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;

use arrow::array::RecordBatch;
use checkpointer::CheckpointStore;
use clap::Parser;
use data_generation::config::{DatasetConfig, TargetConfig};
use data_generation::dataset::MutationConfig;
Expand All @@ -28,6 +29,9 @@ use etl::{DatasetSource, ETLPipeline, PipelineState, StopReason};
use parquet::arrow::ArrowWriter;
use tracing_subscriber::EnvFilter;

/// Static scenario name used until we derive it from the scenario configuration.
const SCENARIO_NAME: &str = "default";

/// Static list of checkpoint queries to run against the DuckDB database at
/// each checkpoint boundary.
const CHECKPOINT_QUERIES: &[&str] = &[
Expand Down Expand Up @@ -231,6 +235,18 @@ async fn main() -> anyhow::Result<()> {
"Pipeline completed, running final checkpoint queries"
);
run_checkpoint_queries(&target, &cli.checkpoint_dir, checkpoint_idx).await?;

// Upload all checkpoints to S3.
let checkpoint_store = CheckpointStore::new(
&cli.bucket,
&cli.source_prefix,
cli.region.as_deref(),
cli.endpoint.as_deref(),
)?;
checkpoint_store
.upload_checkpoints(SCENARIO_NAME, &cli.checkpoint_dir)
.await?;

tracing::info!("Checkpointer completed successfully");
break;
}
Expand Down
Loading