From 616989eeb46f7155473bf0151b2bc2f3015734a2 Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Wed, 18 Feb 2026 20:41:11 -0600 Subject: [PATCH 1/2] feat: Wait for checkpoint steps during load test --- crates/checkpointer/src/lib.rs | 9 +++++++++ crates/checkpointer/src/main.rs | 6 +++++- src/commands/load/mod.rs | 31 ++++++++++++++++++++++++++++--- src/main.rs | 11 ++++++++++- 4 files changed, 52 insertions(+), 5 deletions(-) diff --git a/crates/checkpointer/src/lib.rs b/crates/checkpointer/src/lib.rs index a8878e3b..e97c4e3b 100644 --- a/crates/checkpointer/src/lib.rs +++ b/crates/checkpointer/src/lib.rs @@ -49,6 +49,13 @@ pub struct ScenarioCheckpoint { pub num_checkpoints: usize, /// Number of query results stored in each checkpoint snapshot. pub num_queries: usize, + /// Number of ETL steps between each checkpoint. + /// + /// This is the step count that was passed to [`ETLPipeline::run`] during + /// checkpoint generation. Consumers can use this value to replay the + /// pipeline with the same cadence. + #[serde(default)] + pub checkpoint_interval_steps: usize, } /// S3‑backed store for uploading and downloading checkpoint artefacts. @@ -134,6 +141,7 @@ impl CheckpointStore { &self, scenario: &str, local_checkpoint_dir: &Path, + checkpoint_interval_steps: usize, ) -> anyhow::Result<()> { if !local_checkpoint_dir.is_dir() { anyhow::bail!( @@ -200,6 +208,7 @@ impl CheckpointStore { ScenarioCheckpoint { num_checkpoints, num_queries, + checkpoint_interval_steps, }, ); self.put_manifest(&manifest).await?; diff --git a/crates/checkpointer/src/main.rs b/crates/checkpointer/src/main.rs index d69da17f..29b52270 100644 --- a/crates/checkpointer/src/main.rs +++ b/crates/checkpointer/src/main.rs @@ -244,7 +244,11 @@ async fn main() -> anyhow::Result<()> { cli.endpoint.as_deref(), )?; checkpoint_store - .upload_checkpoints(SCENARIO_NAME, &cli.checkpoint_dir) + .upload_checkpoints( + SCENARIO_NAME, + &cli.checkpoint_dir, + cli.checkpoint_interval_steps as usize, + ) .await?; tracing::info!("Checkpointer completed successfully"); diff --git a/src/commands/load/mod.rs b/src/commands/load/mod.rs index 5cbbc259..c3519176 100644 --- a/src/commands/load/mod.rs +++ b/src/commands/load/mod.rs @@ -201,6 +201,7 @@ pub(crate) async fn run( common_args: &CommonArgs, adbc_conn: adbc_client::AdbcConnection, etl_pipeline: &mut ETLPipeline, + checkpoint_steps: Option, ) -> anyhow::Result<()> { let metric_attributes = run_metric_attributes(common_args); @@ -292,21 +293,45 @@ pub(crate) async fn run( let shutdown_token = throughput_test.cancellation_token(); // --- Start the ETL pipeline (remaining batches) --- + // If checkpoint_steps is set, use `.run(steps)` so the pipeline pauses + // at checkpoint boundaries. Otherwise fall back to `.start()` which runs + // all remaining batches without pausing. tracing::info!("Starting ETL pipeline (remaining batches)..."); let mut etl_state_rx = etl_pipeline.state_watch(); - etl_pipeline.start().await?; + if let Some(steps) = checkpoint_steps { + tracing::info!(checkpoint_steps = steps, "Using checkpoint-aware ETL mode"); + etl_pipeline.run(steps).await?; + } else { + etl_pipeline.start().await?; + } let test_future = throughput_test.wait(); tokio::pin!(test_future); - // Wait for ETL pipeline completion, then cancel the test. + // Wait for ETL pipeline state changes, handling both pauses (checkpoint + // boundaries) and stops (completion / error / cancellation). + // + // When the pipeline pauses at a checkpoint boundary we immediately + // continue it. TODO: In the future this is where checkpoint-based query result + // validation would be triggered before resuming. + // // If interrupted (ctrl-c), cancel both the test and the ETL pipeline. let etl_error: Option = loop { tokio::select! { - // ETL state changed — check if stopped + // ETL state changed — check if stopped or paused _ = etl_state_rx.changed() => { let state = etl_state_rx.borrow_and_update().clone(); match state { + PipelineState::Paused => { + tracing::info!("ETL pipeline paused at checkpoint boundary"); + // TODO: run checkpoint-based query result validation here + if let Err(e) = etl_pipeline.continue_pipeline() { + eprintln!("Failed to continue ETL pipeline after pause: {e}"); + shutdown_token.cancel(); + break Some(format!("Failed to continue ETL pipeline: {e}")); + } + tracing::info!("ETL pipeline resumed"); + } PipelineState::Stopped(StopReason::Completed) => { println!("ETL pipeline completed, stopping benchmark..."); shutdown_token.cancel(); diff --git a/src/main.rs b/src/main.rs index 59f61d76..44c23629 100644 --- a/src/main.rs +++ b/src/main.rs @@ -163,6 +163,7 @@ async fn main() -> anyhow::Result<()> { tracing::warn!("Failed to download checkpoint manifest - results validation will not be enabled: {e}"); e }).ok(); + let mut checkpoint_steps: Option = None; if let Some(manifest) = manifest && let Some(scenario_info) = manifest.scenarios.get(&scenario_name) { @@ -170,9 +171,13 @@ async fn main() -> anyhow::Result<()> { scenario = %scenario_name, num_checkpoints = scenario_info.num_checkpoints, num_queries = scenario_info.num_queries, + checkpoint_interval_steps = scenario_info.checkpoint_interval_steps, path = %checkpoint_dir.path().display(), "Downloading checkpoints" ); + if scenario_info.checkpoint_interval_steps > 0 { + checkpoint_steps = Some(scenario_info.checkpoint_interval_steps); + } if let Err(e) = checkpoint_store .download_checkpoints(&scenario_name, scenario_info, checkpoint_dir.path()) .await @@ -250,9 +255,13 @@ async fn main() -> anyhow::Result<()> { } }; - commands::load::run(&cli.common.scenario, &cli.common, load_conn, &mut pipeline).await?; + commands::load::run(&cli.common.scenario, &cli.common, load_conn, &mut pipeline, checkpoint_steps).await?; // --- Wait for ETL to finish --- + // If checkpoint_steps was set, the load runner already handled + // the pause/resume loop internally, so the pipeline should be + // in a stopped state by now. If it was started without checkpoints + // (.start()), the pipeline may still be running. let final_state = pipeline.wait().await; match &final_state { PipelineState::Stopped(StopReason::Completed) => { From 5fec06a833e107a303c2e18ce01794a8f4901be2 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Wed, 18 Feb 2026 18:52:14 -0800 Subject: [PATCH 2/2] chore: auto-fix cargo fmt + clippy --- src/main.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index 44c23629..1b8dfc69 100644 --- a/src/main.rs +++ b/src/main.rs @@ -255,7 +255,14 @@ async fn main() -> anyhow::Result<()> { } }; - commands::load::run(&cli.common.scenario, &cli.common, load_conn, &mut pipeline, checkpoint_steps).await?; + commands::load::run( + &cli.common.scenario, + &cli.common, + load_conn, + &mut pipeline, + checkpoint_steps, + ) + .await?; // --- Wait for ETL to finish --- // If checkpoint_steps was set, the load runner already handled