Skip to content

Commit 616989e

Browse files
committed
feat: Wait for checkpoint steps during load test
1 parent 15e768d commit 616989e

4 files changed

Lines changed: 52 additions & 5 deletions

File tree

crates/checkpointer/src/lib.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,13 @@ pub struct ScenarioCheckpoint {
4949
pub num_checkpoints: usize,
5050
/// Number of query results stored in each checkpoint snapshot.
5151
pub num_queries: usize,
52+
/// Number of ETL steps between each checkpoint.
53+
///
54+
/// This is the step count that was passed to [`ETLPipeline::run`] during
55+
/// checkpoint generation. Consumers can use this value to replay the
56+
/// pipeline with the same cadence.
57+
#[serde(default)]
58+
pub checkpoint_interval_steps: usize,
5259
}
5360

5461
/// S3‑backed store for uploading and downloading checkpoint artefacts.
@@ -134,6 +141,7 @@ impl CheckpointStore {
134141
&self,
135142
scenario: &str,
136143
local_checkpoint_dir: &Path,
144+
checkpoint_interval_steps: usize,
137145
) -> anyhow::Result<()> {
138146
if !local_checkpoint_dir.is_dir() {
139147
anyhow::bail!(
@@ -200,6 +208,7 @@ impl CheckpointStore {
200208
ScenarioCheckpoint {
201209
num_checkpoints,
202210
num_queries,
211+
checkpoint_interval_steps,
203212
},
204213
);
205214
self.put_manifest(&manifest).await?;

crates/checkpointer/src/main.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,11 @@ async fn main() -> anyhow::Result<()> {
244244
cli.endpoint.as_deref(),
245245
)?;
246246
checkpoint_store
247-
.upload_checkpoints(SCENARIO_NAME, &cli.checkpoint_dir)
247+
.upload_checkpoints(
248+
SCENARIO_NAME,
249+
&cli.checkpoint_dir,
250+
cli.checkpoint_interval_steps as usize,
251+
)
248252
.await?;
249253

250254
tracing::info!("Checkpointer completed successfully");

src/commands/load/mod.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ pub(crate) async fn run(
201201
common_args: &CommonArgs,
202202
adbc_conn: adbc_client::AdbcConnection,
203203
etl_pipeline: &mut ETLPipeline,
204+
checkpoint_steps: Option<usize>,
204205
) -> anyhow::Result<()> {
205206
let metric_attributes = run_metric_attributes(common_args);
206207

@@ -292,21 +293,45 @@ pub(crate) async fn run(
292293
let shutdown_token = throughput_test.cancellation_token();
293294

294295
// --- Start the ETL pipeline (remaining batches) ---
296+
// If checkpoint_steps is set, use `.run(steps)` so the pipeline pauses
297+
// at checkpoint boundaries. Otherwise fall back to `.start()` which runs
298+
// all remaining batches without pausing.
295299
tracing::info!("Starting ETL pipeline (remaining batches)...");
296300
let mut etl_state_rx = etl_pipeline.state_watch();
297-
etl_pipeline.start().await?;
301+
if let Some(steps) = checkpoint_steps {
302+
tracing::info!(checkpoint_steps = steps, "Using checkpoint-aware ETL mode");
303+
etl_pipeline.run(steps).await?;
304+
} else {
305+
etl_pipeline.start().await?;
306+
}
298307

299308
let test_future = throughput_test.wait();
300309
tokio::pin!(test_future);
301310

302-
// Wait for ETL pipeline completion, then cancel the test.
311+
// Wait for ETL pipeline state changes, handling both pauses (checkpoint
312+
// boundaries) and stops (completion / error / cancellation).
313+
//
314+
// When the pipeline pauses at a checkpoint boundary we immediately
315+
// continue it. TODO: In the future this is where checkpoint-based query result
316+
// validation would be triggered before resuming.
317+
//
303318
// If interrupted (ctrl-c), cancel both the test and the ETL pipeline.
304319
let etl_error: Option<String> = loop {
305320
tokio::select! {
306-
// ETL state changed — check if stopped
321+
// ETL state changed — check if stopped or paused
307322
_ = etl_state_rx.changed() => {
308323
let state = etl_state_rx.borrow_and_update().clone();
309324
match state {
325+
PipelineState::Paused => {
326+
tracing::info!("ETL pipeline paused at checkpoint boundary");
327+
// TODO: run checkpoint-based query result validation here
328+
if let Err(e) = etl_pipeline.continue_pipeline() {
329+
eprintln!("Failed to continue ETL pipeline after pause: {e}");
330+
shutdown_token.cancel();
331+
break Some(format!("Failed to continue ETL pipeline: {e}"));
332+
}
333+
tracing::info!("ETL pipeline resumed");
334+
}
310335
PipelineState::Stopped(StopReason::Completed) => {
311336
println!("ETL pipeline completed, stopping benchmark...");
312337
shutdown_token.cancel();

src/main.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,16 +163,21 @@ async fn main() -> anyhow::Result<()> {
163163
tracing::warn!("Failed to download checkpoint manifest - results validation will not be enabled: {e}");
164164
e
165165
}).ok();
166+
let mut checkpoint_steps: Option<usize> = None;
166167
if let Some(manifest) = manifest
167168
&& let Some(scenario_info) = manifest.scenarios.get(&scenario_name)
168169
{
169170
tracing::info!(
170171
scenario = %scenario_name,
171172
num_checkpoints = scenario_info.num_checkpoints,
172173
num_queries = scenario_info.num_queries,
174+
checkpoint_interval_steps = scenario_info.checkpoint_interval_steps,
173175
path = %checkpoint_dir.path().display(),
174176
"Downloading checkpoints"
175177
);
178+
if scenario_info.checkpoint_interval_steps > 0 {
179+
checkpoint_steps = Some(scenario_info.checkpoint_interval_steps);
180+
}
176181
if let Err(e) = checkpoint_store
177182
.download_checkpoints(&scenario_name, scenario_info, checkpoint_dir.path())
178183
.await
@@ -250,9 +255,13 @@ async fn main() -> anyhow::Result<()> {
250255
}
251256
};
252257

253-
commands::load::run(&cli.common.scenario, &cli.common, load_conn, &mut pipeline).await?;
258+
commands::load::run(&cli.common.scenario, &cli.common, load_conn, &mut pipeline, checkpoint_steps).await?;
254259

255260
// --- Wait for ETL to finish ---
261+
// If checkpoint_steps was set, the load runner already handled
262+
// the pause/resume loop internally, so the pipeline should be
263+
// in a stopped state by now. If it was started without checkpoints
264+
// (.start()), the pipeline may still be running.
256265
let final_state = pipeline.wait().await;
257266
match &final_state {
258267
PipelineState::Stopped(StopReason::Completed) => {

0 commit comments

Comments
 (0)