Skip to content

Commit d50a81a

Browse files
Fix: run final checkpoint validation when parallel_count=0
1 parent bac5948 commit d50a81a

2 files changed

Lines changed: 182 additions & 7 deletions

File tree

  • crates/test-framework/src/spicetest/datasets
  • src/commands/load

crates/test-framework/src/spicetest/datasets/mod.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -266,10 +266,6 @@ impl SpiceTest<NotStarted> {
266266
return Err(anyhow::anyhow!("Query set is empty"));
267267
}
268268

269-
if self.state.parallel_count == 0 {
270-
return Err(anyhow::anyhow!("Parallel count must be greater than 0"));
271-
}
272-
273269
// Ensure executor is configured
274270
let executor = self
275271
.state
@@ -481,6 +477,9 @@ impl SpiceTest<Completed> {
481477
}
482478

483479
pub fn get_throughput_metric(&self, scale: f64) -> Result<f64> {
480+
if self.state.parallel_count == 0 {
481+
return Ok(0.0);
482+
}
484483
// metric = (Parallel Query Count * Test Suite Query Count * 3600) / Cs * Scale
485484
let lhs = self.state.parallel_count * self.state.query_count * 3600;
486485

src/commands/load/mod.rs

Lines changed: 179 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -557,6 +557,8 @@ async fn validate_full_query_set(
557557
) -> bool {
558558
use futures::stream::{self, StreamExt};
559559

560+
// buffer_unordered(0) would never poll futures, so use at least 1.
561+
let effective_concurrency = concurrency.max(1);
560562
let results: Vec<(Arc<str>, _)> = stream::iter(queries)
561563
.map(|query| {
562564
let query_name = Arc::clone(&query.name);
@@ -567,7 +569,7 @@ async fn validate_full_query_set(
567569
(query_name, result)
568570
}
569571
})
570-
.buffer_unordered(concurrency)
572+
.buffer_unordered(effective_concurrency)
571573
.collect()
572574
.await;
573575

@@ -924,7 +926,180 @@ pub(crate) async fn run(
924926
// them post-loop so they carry the final `outcome` dimension.
925927
let mut checkpoint_e2e_latency_samples: Vec<f64> = Vec::new();
926928

927-
let run_outcome: Option<RunOutcome> = loop {
929+
// If there are no query workers, skip the ETL monitoring loop entirely —
930+
// just wait for the pipeline to finish, then proceed to results.
931+
let run_outcome: Option<RunOutcome> = if common_args.concurrency == 0 {
932+
// No query workers — just monitor the ETL pipeline and run checkpoint
933+
// validation when it pauses. Break when the pipeline reaches a terminal state.
934+
loop {
935+
if etl_state_rx.changed().await.is_err() {
936+
break None;
937+
}
938+
let state = etl_state_rx.borrow_and_update().clone();
939+
match state {
940+
PipelineState::Paused => {
941+
let checkpoint_idx = etl_pipeline.checkpoint_idx();
942+
tracing::info!(
943+
checkpoint_idx,
944+
"ETL pipeline paused at checkpoint boundary (no query workers)"
945+
);
946+
947+
if has_checkpoint_validation
948+
&& let Some(cp_dir) = checkpoint_dir
949+
{
950+
match load_checkpoint_results(cp_dir, checkpoint_idx, &query_names) {
951+
Ok(expected_results) if !expected_results.is_empty() => {
952+
let expected_row_counts =
953+
load_checkpoint_row_counts(cp_dir, checkpoint_idx)
954+
.unwrap_or_default();
955+
tracing::info!(
956+
checkpoint_idx,
957+
num_queries = expected_results.len(),
958+
num_tables = expected_row_counts.len(),
959+
"Running checkpoint validation (no query workers)"
960+
);
961+
962+
let checkpoint_pause_time = std::time::Instant::now();
963+
964+
let result = run_checkpoint_validation(
965+
&validation_executor,
966+
&queries,
967+
&expected_results,
968+
&expected_row_counts,
969+
checkpoint_idx,
970+
common_args.concurrency,
971+
Duration::from_secs(common_args.checkpoint_validation_period),
972+
Duration::from_secs(600),
973+
checkpoint_pause_time,
974+
query_catalog_namespace.as_deref(),
975+
)
976+
.await;
977+
978+
match result {
979+
CheckpointValidationResult::Converged { e2e_latency_ms } => {
980+
checkpoint_e2e_latency_samples.push(e2e_latency_ms);
981+
}
982+
CheckpointValidationResult::Interrupted => {
983+
eprintln!("Interrupt received during checkpoint validation, stopping...");
984+
etl_pipeline.cancel();
985+
break Some(RunOutcome::Cancelled);
986+
}
987+
CheckpointValidationResult::TimedOut => {
988+
eprintln!(
989+
"Checkpoint {checkpoint_idx} validation timed out"
990+
);
991+
etl_pipeline.cancel();
992+
break Some(RunOutcome::ValidationTimeout);
993+
}
994+
}
995+
}
996+
Ok(_) => {
997+
tracing::info!(
998+
checkpoint_idx,
999+
"No checkpoint results found, skipping validation"
1000+
);
1001+
}
1002+
Err(e) => {
1003+
tracing::warn!(
1004+
checkpoint_idx,
1005+
error = %e,
1006+
"Failed to load checkpoint results, skipping validation"
1007+
);
1008+
}
1009+
}
1010+
}
1011+
1012+
if let Err(e) = etl_pipeline.continue_pipeline() {
1013+
eprintln!("Failed to continue ETL pipeline: {e}");
1014+
break Some(RunOutcome::PipelineFailure(format!("Failed to continue ETL pipeline: {e}")));
1015+
}
1016+
}
1017+
PipelineState::Stopped(StopReason::Completed) => {
1018+
println!("ETL pipeline completed (no query workers)");
1019+
1020+
// --- Final checkpoint validation ---
1021+
// The pipeline transitions directly from Running →
1022+
// Completed after the last batch, so the final
1023+
// checkpoint boundary is never seen as a Paused state.
1024+
// Run validation for it here.
1025+
if has_checkpoint_validation
1026+
&& let Some(cp_dir) = checkpoint_dir
1027+
{
1028+
let checkpoint_idx = etl_pipeline.checkpoint_idx();
1029+
match load_checkpoint_results(cp_dir, checkpoint_idx, &query_names) {
1030+
Ok(expected_results) if !expected_results.is_empty() => {
1031+
let expected_row_counts =
1032+
load_checkpoint_row_counts(cp_dir, checkpoint_idx)
1033+
.unwrap_or_default();
1034+
tracing::info!(
1035+
checkpoint_idx,
1036+
num_queries = expected_results.len(),
1037+
num_tables = expected_row_counts.len(),
1038+
"Running final checkpoint validation (no query workers)"
1039+
);
1040+
1041+
let checkpoint_pause_time = std::time::Instant::now();
1042+
1043+
let result = run_checkpoint_validation(
1044+
&validation_executor,
1045+
&queries,
1046+
&expected_results,
1047+
&expected_row_counts,
1048+
checkpoint_idx,
1049+
common_args.concurrency,
1050+
Duration::from_secs(common_args.checkpoint_validation_period),
1051+
Duration::from_secs(600),
1052+
checkpoint_pause_time,
1053+
query_catalog_namespace.as_deref(),
1054+
)
1055+
.await;
1056+
1057+
match result {
1058+
CheckpointValidationResult::Converged { e2e_latency_ms } => {
1059+
checkpoint_e2e_latency_samples.push(e2e_latency_ms);
1060+
}
1061+
CheckpointValidationResult::Interrupted => {
1062+
eprintln!("Interrupt received during final checkpoint validation, stopping...");
1063+
break Some(RunOutcome::Cancelled);
1064+
}
1065+
CheckpointValidationResult::TimedOut => {
1066+
eprintln!(
1067+
"Final checkpoint {checkpoint_idx} validation timed out after 600s without convergence, aborting run"
1068+
);
1069+
break Some(RunOutcome::ValidationTimeout);
1070+
}
1071+
}
1072+
}
1073+
Ok(_) => {
1074+
tracing::info!(
1075+
checkpoint_idx,
1076+
"No final checkpoint results found, skipping validation"
1077+
);
1078+
}
1079+
Err(e) => {
1080+
tracing::warn!(
1081+
checkpoint_idx,
1082+
error = %e,
1083+
"Failed to load final checkpoint results, skipping validation"
1084+
);
1085+
}
1086+
}
1087+
}
1088+
1089+
break None;
1090+
}
1091+
PipelineState::Stopped(StopReason::Error(ref e)) => {
1092+
eprintln!("ETL pipeline failed: {e}");
1093+
break Some(RunOutcome::PipelineFailure(e.clone()));
1094+
}
1095+
PipelineState::Stopped(StopReason::Cancelled) => {
1096+
break None;
1097+
}
1098+
_ => {}
1099+
}
1100+
}
1101+
} else {
1102+
loop {
9281103
tokio::select! {
9291104
// ETL state changed — check if stopped or paused
9301105
_ = etl_state_rx.changed() => {
@@ -1084,7 +1259,7 @@ pub(crate) async fn run(
10841259
);
10851260
}
10861261
}
1087-
} else if !has_checkpoint_validation {
1262+
} else if !has_checkpoint_validation && common_args.concurrency > 0 {
10881263
// When results validation is not enabled, wait for
10891264
// at least 1 query set iteration to complete so we
10901265
// collect meaningful query metrics before stopping.
@@ -1133,6 +1308,7 @@ pub(crate) async fn run(
11331308
break Some(RunOutcome::Cancelled);
11341309
}
11351310
}
1311+
}
11361312
};
11371313

11381314
let test = match test_future.await {

0 commit comments

Comments
 (0)