Skip to content

Commit df1e7e0

Browse files
feat: Add the CheckpointStore and connect it to spicebench (#76)
* wip * feat: Connect checkpointer store to spicebench * chore: auto-fix cargo fmt + clippy * Update lib.rs --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 249dcad commit df1e7e0

6 files changed

Lines changed: 329 additions & 0 deletions

File tree

Cargo.lock

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,13 +138,15 @@ adbc_client = { path = "crates/adbc_client" }
138138
arrow.workspace = true
139139
arrow-schema.workspace = true
140140
async-trait.workspace = true
141+
checkpointer = { path = "crates/checkpointer" }
141142
clap.workspace = true
142143
data-generation = { path = "crates/data-generation" }
143144
etl = { path = "crates/etl" }
144145
reqwest.workspace = true
145146
serde.workspace = true
146147
serde_json.workspace = true
147148
system-adapter-protocol = { path = "crates/system-adapter-protocol" }
149+
tempfile.workspace = true
148150
test-framework = { path = "crates/test-framework" }
149151
tokio.workspace = true
150152
tokio-util.workspace = true

crates/checkpointer/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ chrono.workspace = true
1616
clap = { workspace = true, features = ["derive"] }
1717
data-generation = { path = "../data-generation" }
1818
etl = { path = "../etl", features = ["duckdb"] }
19+
object_store = { workspace = true }
20+
serde = { workspace = true, features = ["derive"] }
21+
serde_json.workspace = true
1922
system-adapter-protocol = { path = "../system-adapter-protocol" }
2023
parquet.workspace = true
2124
tokio.workspace = true

crates/checkpointer/src/lib.rs

Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
/*
2+
Copyright 2024-2025 The Spice.ai OSS Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
https://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
//! Checkpoint object store — upload / download checkpoint artefacts to S3.
18+
//!
19+
//! ## Layout
20+
//!
21+
//! ```text
22+
//! s3://{bucket}/{prefix}/{scenario}/checkpoints/{checkpoint_idx}/{query_idx}.parquet
23+
//! s3://{bucket}/{prefix}/checkpoints.json ← manifest
24+
//! ```
25+
//!
26+
//! The manifest (`checkpoints.json`) contains metadata for every scenario
27+
//! that has been checkpointed under the given prefix.
28+
29+
use std::collections::HashMap;
30+
use std::path::Path;
31+
use std::sync::Arc;
32+
33+
use object_store::aws::AmazonS3Builder;
34+
use object_store::path::Path as ObjectPath;
35+
use object_store::{ObjectStore, PutPayload};
36+
use serde::{Deserialize, Serialize};
37+
38+
/// Top-level manifest persisted as `{prefix}/checkpoints.json`.
39+
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
40+
pub struct CheckpointManifest {
41+
/// Map from scenario name to its metadata.
42+
pub scenarios: HashMap<String, ScenarioCheckpoint>,
43+
}
44+
45+
/// Per-scenario metadata stored inside the manifest.
46+
#[derive(Debug, Clone, Serialize, Deserialize)]
47+
pub struct ScenarioCheckpoint {
48+
/// Total number of checkpoint snapshots stored for this scenario.
49+
pub num_checkpoints: usize,
50+
/// Number of query results stored in each checkpoint snapshot.
51+
pub num_queries: usize,
52+
}
53+
54+
/// S3‑backed store for uploading and downloading checkpoint artefacts.
55+
pub struct CheckpointStore {
56+
store: Arc<dyn ObjectStore>,
57+
#[allow(dead_code)]
58+
bucket: String,
59+
prefix: String,
60+
}
61+
62+
impl CheckpointStore {
63+
/// Build a new `CheckpointStore` from raw S3 connection parameters.
64+
///
65+
/// `prefix` may be empty; it is the key‑prefix shared by all scenarios
66+
/// (e.g. `"run-42"`).
67+
pub fn new(
68+
bucket: &str,
69+
prefix: &str,
70+
region: Option<&str>,
71+
endpoint: Option<&str>,
72+
) -> anyhow::Result<Self> {
73+
let mut builder = AmazonS3Builder::from_env().with_bucket_name(bucket);
74+
75+
if let Some(region) = region {
76+
builder = builder.with_region(region);
77+
}
78+
if let Some(endpoint) = endpoint
79+
&& !endpoint.is_empty()
80+
{
81+
builder = builder.with_endpoint(endpoint);
82+
if endpoint.starts_with("http://") {
83+
builder = builder.with_allow_http(true);
84+
}
85+
}
86+
87+
let store = Arc::new(builder.build()?);
88+
Ok(Self {
89+
store,
90+
bucket: bucket.to_owned(),
91+
prefix: prefix.to_owned(),
92+
})
93+
}
94+
95+
fn object_path(&self, suffix: &str) -> ObjectPath {
96+
if self.prefix.is_empty() {
97+
ObjectPath::from(suffix.to_owned())
98+
} else {
99+
ObjectPath::from(format!("{}/{suffix}", self.prefix))
100+
}
101+
}
102+
103+
fn manifest_path(&self) -> ObjectPath {
104+
self.object_path("checkpoints.json")
105+
}
106+
107+
fn checkpoint_parquet_path(
108+
&self,
109+
scenario: &str,
110+
checkpoint_idx: usize,
111+
query_idx: usize,
112+
) -> ObjectPath {
113+
self.object_path(&format!(
114+
"{scenario}/checkpoints/{checkpoint_idx}/{query_idx}.parquet"
115+
))
116+
}
117+
118+
/// Upload all checkpoint parquet files from `local_checkpoint_dir` to S3,
119+
/// then update (merge into) the manifest at `{prefix}/checkpoints.json`.
120+
///
121+
/// The local directory is expected to have the layout produced by the
122+
/// checkpointer binary:
123+
///
124+
/// ```text
125+
/// {local_checkpoint_dir}/
126+
/// 0/
127+
/// 0.parquet
128+
/// 1.parquet
129+
/// 1/
130+
/// 0.parquet
131+
/// ...
132+
/// ```
133+
pub async fn upload_checkpoints(
134+
&self,
135+
scenario: &str,
136+
local_checkpoint_dir: &Path,
137+
) -> anyhow::Result<()> {
138+
if !local_checkpoint_dir.is_dir() {
139+
anyhow::bail!(
140+
"Checkpoint directory does not exist: {}",
141+
local_checkpoint_dir.display()
142+
);
143+
}
144+
145+
let mut num_checkpoints: usize = 0;
146+
let mut num_queries: usize = 0;
147+
148+
// Iterate over checkpoint index directories (0, 1, 2, …).
149+
let mut checkpoint_dirs: Vec<_> = std::fs::read_dir(local_checkpoint_dir)?
150+
.filter_map(Result::ok)
151+
.filter(|e| e.path().is_dir())
152+
.collect();
153+
checkpoint_dirs.sort_by_key(|e| e.file_name());
154+
155+
for checkpoint_entry in &checkpoint_dirs {
156+
let checkpoint_idx: usize = checkpoint_entry
157+
.file_name()
158+
.to_string_lossy()
159+
.parse()
160+
.unwrap_or(0);
161+
162+
let mut query_files: Vec<_> = std::fs::read_dir(checkpoint_entry.path())?
163+
.filter_map(Result::ok)
164+
.filter(|e| e.path().extension().is_some_and(|ext| ext == "parquet"))
165+
.collect();
166+
query_files.sort_by_key(|e| e.file_name());
167+
168+
for qf in &query_files {
169+
let q_idx: usize = qf
170+
.path()
171+
.file_stem()
172+
.and_then(|s| s.to_str())
173+
.and_then(|s| s.parse().ok())
174+
.unwrap_or(0);
175+
176+
let bytes = std::fs::read(qf.path())?;
177+
let dest = self.checkpoint_parquet_path(scenario, checkpoint_idx, q_idx);
178+
179+
tracing::info!(
180+
scenario,
181+
checkpoint = checkpoint_idx,
182+
query = q_idx,
183+
dest = %dest,
184+
"Uploading checkpoint parquet"
185+
);
186+
self.store.put(&dest, PutPayload::from(bytes)).await?;
187+
188+
if q_idx >= num_queries {
189+
num_queries = q_idx + 1;
190+
}
191+
}
192+
193+
num_checkpoints = checkpoint_idx + 1;
194+
}
195+
196+
// Merge into manifest.
197+
let mut manifest = self.download_manifest().await.unwrap_or_default();
198+
manifest.scenarios.insert(
199+
scenario.to_owned(),
200+
ScenarioCheckpoint {
201+
num_checkpoints,
202+
num_queries,
203+
},
204+
);
205+
self.put_manifest(&manifest).await?;
206+
207+
tracing::info!(
208+
scenario,
209+
num_checkpoints,
210+
num_queries,
211+
"Checkpoint upload complete"
212+
);
213+
Ok(())
214+
}
215+
216+
/// Upload (overwrite) the manifest JSON.
217+
async fn put_manifest(&self, manifest: &CheckpointManifest) -> anyhow::Result<()> {
218+
let json = serde_json::to_vec_pretty(manifest)?;
219+
self.store
220+
.put(&self.manifest_path(), PutPayload::from(json))
221+
.await?;
222+
Ok(())
223+
}
224+
225+
/// Download and deserialise the manifest from S3.
226+
///
227+
/// Returns `Ok(manifest)` or an error if the manifest does not exist or
228+
/// cannot be parsed.
229+
pub async fn download_manifest(&self) -> anyhow::Result<CheckpointManifest> {
230+
let data = self.store.get(&self.manifest_path()).await?.bytes().await?;
231+
let manifest: CheckpointManifest = serde_json::from_slice(&data)?;
232+
Ok(manifest)
233+
}
234+
235+
/// Download all checkpoint parquet files for `scenario` into
236+
/// `local_dir/{checkpoint_idx}/{query_idx}.parquet`.
237+
pub async fn download_checkpoints(
238+
&self,
239+
scenario: &str,
240+
info: &ScenarioCheckpoint,
241+
local_dir: &Path,
242+
) -> anyhow::Result<()> {
243+
for checkpoint_idx in 0..info.num_checkpoints {
244+
let checkpoint_dir = local_dir.join(checkpoint_idx.to_string());
245+
std::fs::create_dir_all(&checkpoint_dir)?;
246+
247+
for q_idx in 0..info.num_queries {
248+
let remote = self.checkpoint_parquet_path(scenario, checkpoint_idx, q_idx);
249+
let data = self.store.get(&remote).await?.bytes().await?;
250+
let local_path = checkpoint_dir.join(format!("{q_idx}.parquet"));
251+
std::fs::write(&local_path, &data)?;
252+
253+
tracing::info!(
254+
scenario,
255+
checkpoint = checkpoint_idx,
256+
query = q_idx,
257+
path = %local_path.display(),
258+
"Downloaded checkpoint parquet"
259+
);
260+
}
261+
}
262+
263+
Ok(())
264+
}
265+
}

crates/checkpointer/src/main.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::path::{Path, PathBuf};
1919
use std::sync::Arc;
2020

2121
use arrow::array::RecordBatch;
22+
use checkpointer::CheckpointStore;
2223
use clap::Parser;
2324
use data_generation::config::{DatasetConfig, TargetConfig};
2425
use data_generation::dataset::MutationConfig;
@@ -28,6 +29,9 @@ use etl::{DatasetSource, ETLPipeline, PipelineState, StopReason};
2829
use parquet::arrow::ArrowWriter;
2930
use tracing_subscriber::EnvFilter;
3031

32+
/// Static scenario name used until we derive it from the scenario configuration.
33+
const SCENARIO_NAME: &str = "default";
34+
3135
/// Static list of checkpoint queries to run against the DuckDB database at
3236
/// each checkpoint boundary.
3337
const CHECKPOINT_QUERIES: &[&str] = &[
@@ -231,6 +235,18 @@ async fn main() -> anyhow::Result<()> {
231235
"Pipeline completed, running final checkpoint queries"
232236
);
233237
run_checkpoint_queries(&target, &cli.checkpoint_dir, checkpoint_idx).await?;
238+
239+
// Upload all checkpoints to S3.
240+
let checkpoint_store = CheckpointStore::new(
241+
&cli.bucket,
242+
&cli.source_prefix,
243+
cli.region.as_deref(),
244+
cli.endpoint.as_deref(),
245+
)?;
246+
checkpoint_store
247+
.upload_checkpoints(SCENARIO_NAME, &cli.checkpoint_dir)
248+
.await?;
249+
234250
tracing::info!("Checkpointer completed successfully");
235251
break;
236252
}

0 commit comments

Comments
 (0)