Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions crates/test-framework/src/spicetest/datasets/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,11 +355,11 @@ impl SpiceTest<Running> {
let mut query_statuses = BTreeMap::new();
for worker_result in join_all(self.state.query_workers).await {
let worker_result = worker_result??;
if worker_result.connection_failed {
return Err(anyhow::anyhow!(
"Test failed - a connection failed during the test"
));
}
// if worker_result.connection_failed {
// return Err(anyhow::anyhow!(
// "Test failed - a connection failed during the test"
// ));
// }

for (query, duration) in worker_result.query_durations {
query_durations
Expand Down
49 changes: 41 additions & 8 deletions crates/test-framework/src/spicetest/datasets/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub struct SpiceTestQueryWorkerResult {
struct QueryRunResult {
connection_failed: bool,
query_failure: Option<String>,
shutdown: bool,
}

impl SpiceTestQueryWorkerResult {
Expand Down Expand Up @@ -287,6 +288,10 @@ impl SpiceTestQueryWorker {
)
.await?
{
if self.shutdown_token.is_cancelled() {
println!("Worker {} exiting due to shutdown", self.id);
break;
}
return Ok(SpiceTestQueryWorkerResult::new(
&query_durations,
query_iteration_durations,
Expand Down Expand Up @@ -322,7 +327,9 @@ impl SpiceTestQueryWorker {
// To discard the abnormal results caused by: establishing initial connection / spark cluster startup time

let QueryRunResult {
connection_failed, ..
connection_failed,
shutdown,
..
} = self
.run_single_query(
query,
Expand All @@ -332,6 +339,9 @@ impl SpiceTestQueryWorker {
false,
)
.await?;
if shutdown {
break;
}
if connection_failed {
return Ok(SpiceTestQueryWorkerResult::new(
&query_durations,
Expand Down Expand Up @@ -387,6 +397,7 @@ impl SpiceTestQueryWorker {
let QueryRunResult {
connection_failed,
query_failure,
shutdown,
} = self
.run_single_query(
query,
Expand All @@ -397,6 +408,10 @@ impl SpiceTestQueryWorker {
)
.await?;

if shutdown {
break;
}

if connection_failed {
return Ok(SpiceTestQueryWorkerResult::new(
&query_durations,
Expand All @@ -421,6 +436,8 @@ impl SpiceTestQueryWorker {
}
}

println!("Worker {} exited", self.id);

Ok(SpiceTestQueryWorkerResult::new(
&query_durations,
query_iteration_durations,
Expand All @@ -446,6 +463,7 @@ impl SpiceTestQueryWorker {
let QueryRunResult {
connection_failed,
query_failure,
shutdown,
} = self
.run_single_query(
query,
Expand All @@ -455,7 +473,7 @@ impl SpiceTestQueryWorker {
false,
)
.await?;
if connection_failed {
if shutdown || connection_failed {
return Ok(false);
}

Expand Down Expand Up @@ -500,8 +518,18 @@ impl SpiceTestQueryWorker {
Ok(()) => Ok(QueryRunResult {
connection_failed: false,
query_failure: None,
shutdown: false,
}),
Err(e) => {
// If shutdown was requested, exit quietly without logging errors
if self.shutdown_token.is_cancelled() {
return Ok(QueryRunResult {
connection_failed: false,
query_failure: None,
shutdown: true,
});
}

// Check if this is a connection error using typed error checking
// This is more reliable than string matching
let is_connection_error =
Expand All @@ -522,6 +550,7 @@ impl SpiceTestQueryWorker {
Ok(QueryRunResult {
connection_failed: true,
query_failure: None,
shutdown: false,
})
} else {
eprintln!(
Expand All @@ -536,6 +565,7 @@ impl SpiceTestQueryWorker {
Ok(QueryRunResult {
connection_failed: false,
query_failure: Some(format!("{e}")),
shutdown: false,
})
}
}
Expand All @@ -550,12 +580,15 @@ impl SpiceTestQueryWorker {
results_snapshot: bool,
validate: bool,
) -> Result<()> {
if self.shutdown_token.is_cancelled() {
return Err(anyhow::anyhow!("Shutdown requested"));
}

// Execute query using the configured executor
let result = self.executor.execute(query).await?;
// Race query execution against shutdown signal so in-flight queries
// are aborted immediately when shutdown is requested.
let result = tokio::select! {
biased;
() = self.shutdown_token.cancelled() => {
return Err(anyhow::anyhow!("Shutdown requested"));
}
result = self.executor.execute(query) => result?,
};

// Handle validation if supported and requested
if validate
Expand Down