Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions crates/checkpointer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ pub struct ScenarioCheckpoint {
pub num_checkpoints: usize,
/// Number of query results stored in each checkpoint snapshot.
pub num_queries: usize,
/// Number of ETL steps between each checkpoint.
///
/// This is the step count that was passed to [`ETLPipeline::run`] during
/// checkpoint generation. Consumers can use this value to replay the
/// pipeline with the same cadence.
#[serde(default)]
pub checkpoint_interval_steps: usize,
}

/// S3‑backed store for uploading and downloading checkpoint artefacts.
Expand Down Expand Up @@ -134,6 +141,7 @@ impl CheckpointStore {
&self,
scenario: &str,
local_checkpoint_dir: &Path,
checkpoint_interval_steps: usize,
) -> anyhow::Result<()> {
if !local_checkpoint_dir.is_dir() {
anyhow::bail!(
Expand Down Expand Up @@ -200,6 +208,7 @@ impl CheckpointStore {
ScenarioCheckpoint {
num_checkpoints,
num_queries,
checkpoint_interval_steps,
},
);
self.put_manifest(&manifest).await?;
Expand Down
6 changes: 5 additions & 1 deletion crates/checkpointer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,11 @@ async fn main() -> anyhow::Result<()> {
cli.endpoint.as_deref(),
)?;
checkpoint_store
.upload_checkpoints(SCENARIO_NAME, &cli.checkpoint_dir)
.upload_checkpoints(
SCENARIO_NAME,
&cli.checkpoint_dir,
cli.checkpoint_interval_steps as usize,
)
.await?;

tracing::info!("Checkpointer completed successfully");
Expand Down
31 changes: 28 additions & 3 deletions src/commands/load/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ pub(crate) async fn run(
common_args: &CommonArgs,
adbc_conn: adbc_client::AdbcConnection,
etl_pipeline: &mut ETLPipeline,
checkpoint_steps: Option<usize>,
) -> anyhow::Result<()> {
let metric_attributes = run_metric_attributes(common_args);

Expand Down Expand Up @@ -292,21 +293,45 @@ pub(crate) async fn run(
let shutdown_token = throughput_test.cancellation_token();

// --- Start the ETL pipeline (remaining batches) ---
// If checkpoint_steps is set, use `.run(steps)` so the pipeline pauses
// at checkpoint boundaries. Otherwise fall back to `.start()` which runs
// all remaining batches without pausing.
tracing::info!("Starting ETL pipeline (remaining batches)...");
let mut etl_state_rx = etl_pipeline.state_watch();
etl_pipeline.start().await?;
if let Some(steps) = checkpoint_steps {
tracing::info!(checkpoint_steps = steps, "Using checkpoint-aware ETL mode");
etl_pipeline.run(steps).await?;
} else {
etl_pipeline.start().await?;
}

let test_future = throughput_test.wait();
tokio::pin!(test_future);

// Wait for ETL pipeline completion, then cancel the test.
// Wait for ETL pipeline state changes, handling both pauses (checkpoint
// boundaries) and stops (completion / error / cancellation).
//
// When the pipeline pauses at a checkpoint boundary we immediately
// continue it. TODO: In the future this is where checkpoint-based query result
// validation would be triggered before resuming.
//
// If interrupted (ctrl-c), cancel both the test and the ETL pipeline.
let etl_error: Option<String> = loop {
tokio::select! {
// ETL state changed — check if stopped
// ETL state changed — check if stopped or paused
_ = etl_state_rx.changed() => {
let state = etl_state_rx.borrow_and_update().clone();
match state {
PipelineState::Paused => {
tracing::info!("ETL pipeline paused at checkpoint boundary");
// TODO: run checkpoint-based query result validation here
if let Err(e) = etl_pipeline.continue_pipeline() {
eprintln!("Failed to continue ETL pipeline after pause: {e}");
shutdown_token.cancel();
break Some(format!("Failed to continue ETL pipeline: {e}"));
}
tracing::info!("ETL pipeline resumed");
}
PipelineState::Stopped(StopReason::Completed) => {
println!("ETL pipeline completed, stopping benchmark...");
shutdown_token.cancel();
Expand Down
11 changes: 10 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,16 +163,21 @@ async fn main() -> anyhow::Result<()> {
tracing::warn!("Failed to download checkpoint manifest - results validation will not be enabled: {e}");
e
}).ok();
let mut checkpoint_steps: Option<usize> = None;
if let Some(manifest) = manifest
&& let Some(scenario_info) = manifest.scenarios.get(&scenario_name)
{
tracing::info!(
scenario = %scenario_name,
num_checkpoints = scenario_info.num_checkpoints,
num_queries = scenario_info.num_queries,
checkpoint_interval_steps = scenario_info.checkpoint_interval_steps,
path = %checkpoint_dir.path().display(),
"Downloading checkpoints"
);
if scenario_info.checkpoint_interval_steps > 0 {
checkpoint_steps = Some(scenario_info.checkpoint_interval_steps);
}
if let Err(e) = checkpoint_store
.download_checkpoints(&scenario_name, scenario_info, checkpoint_dir.path())
.await
Expand Down Expand Up @@ -250,9 +255,13 @@ async fn main() -> anyhow::Result<()> {
}
};

commands::load::run(&cli.common.scenario, &cli.common, load_conn, &mut pipeline).await?;
commands::load::run(&cli.common.scenario, &cli.common, load_conn, &mut pipeline, checkpoint_steps).await?;

// --- Wait for ETL to finish ---
// If checkpoint_steps was set, the load runner already handled
// the pause/resume loop internally, so the pipeline should be
// in a stopped state by now. If it was started without checkpoints
// (.start()), the pipeline may still be running.
let final_state = pipeline.wait().await;
match &final_state {
PipelineState::Stopped(StopReason::Completed) => {
Expand Down