Skip to content

Commit ccbe185

Browse files
authored
fix (#109)
1 parent dcb2702 commit ccbe185

2 files changed

Lines changed: 46 additions & 13 deletions

File tree

crates/test-framework/src/spicetest/datasets/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -355,11 +355,11 @@ impl SpiceTest<Running> {
355355
let mut query_statuses = BTreeMap::new();
356356
for worker_result in join_all(self.state.query_workers).await {
357357
let worker_result = worker_result??;
358-
if worker_result.connection_failed {
359-
return Err(anyhow::anyhow!(
360-
"Test failed - a connection failed during the test"
361-
));
362-
}
358+
// if worker_result.connection_failed {
359+
// return Err(anyhow::anyhow!(
360+
// "Test failed - a connection failed during the test"
361+
// ));
362+
// }
363363

364364
for (query, duration) in worker_result.query_durations {
365365
query_durations

crates/test-framework/src/spicetest/datasets/worker.rs

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ pub struct SpiceTestQueryWorkerResult {
7676
struct QueryRunResult {
7777
connection_failed: bool,
7878
query_failure: Option<String>,
79+
shutdown: bool,
7980
}
8081

8182
impl SpiceTestQueryWorkerResult {
@@ -287,6 +288,10 @@ impl SpiceTestQueryWorker {
287288
)
288289
.await?
289290
{
291+
if self.shutdown_token.is_cancelled() {
292+
println!("Worker {} exiting due to shutdown", self.id);
293+
break;
294+
}
290295
return Ok(SpiceTestQueryWorkerResult::new(
291296
&query_durations,
292297
query_iteration_durations,
@@ -322,7 +327,9 @@ impl SpiceTestQueryWorker {
322327
// To discard the abnormal results caused by: establishing initial connection / spark cluster startup time
323328

324329
let QueryRunResult {
325-
connection_failed, ..
330+
connection_failed,
331+
shutdown,
332+
..
326333
} = self
327334
.run_single_query(
328335
query,
@@ -332,6 +339,9 @@ impl SpiceTestQueryWorker {
332339
false,
333340
)
334341
.await?;
342+
if shutdown {
343+
break;
344+
}
335345
if connection_failed {
336346
return Ok(SpiceTestQueryWorkerResult::new(
337347
&query_durations,
@@ -387,6 +397,7 @@ impl SpiceTestQueryWorker {
387397
let QueryRunResult {
388398
connection_failed,
389399
query_failure,
400+
shutdown,
390401
} = self
391402
.run_single_query(
392403
query,
@@ -397,6 +408,10 @@ impl SpiceTestQueryWorker {
397408
)
398409
.await?;
399410

411+
if shutdown {
412+
break;
413+
}
414+
400415
if connection_failed {
401416
return Ok(SpiceTestQueryWorkerResult::new(
402417
&query_durations,
@@ -421,6 +436,8 @@ impl SpiceTestQueryWorker {
421436
}
422437
}
423438

439+
println!("Worker {} exited", self.id);
440+
424441
Ok(SpiceTestQueryWorkerResult::new(
425442
&query_durations,
426443
query_iteration_durations,
@@ -446,6 +463,7 @@ impl SpiceTestQueryWorker {
446463
let QueryRunResult {
447464
connection_failed,
448465
query_failure,
466+
shutdown,
449467
} = self
450468
.run_single_query(
451469
query,
@@ -455,7 +473,7 @@ impl SpiceTestQueryWorker {
455473
false,
456474
)
457475
.await?;
458-
if connection_failed {
476+
if shutdown || connection_failed {
459477
return Ok(false);
460478
}
461479

@@ -500,8 +518,18 @@ impl SpiceTestQueryWorker {
500518
Ok(()) => Ok(QueryRunResult {
501519
connection_failed: false,
502520
query_failure: None,
521+
shutdown: false,
503522
}),
504523
Err(e) => {
524+
// If shutdown was requested, exit quietly without logging errors
525+
if self.shutdown_token.is_cancelled() {
526+
return Ok(QueryRunResult {
527+
connection_failed: false,
528+
query_failure: None,
529+
shutdown: true,
530+
});
531+
}
532+
505533
// Check if this is a connection error using typed error checking
506534
// This is more reliable than string matching
507535
let is_connection_error =
@@ -522,6 +550,7 @@ impl SpiceTestQueryWorker {
522550
Ok(QueryRunResult {
523551
connection_failed: true,
524552
query_failure: None,
553+
shutdown: false,
525554
})
526555
} else {
527556
eprintln!(
@@ -536,6 +565,7 @@ impl SpiceTestQueryWorker {
536565
Ok(QueryRunResult {
537566
connection_failed: false,
538567
query_failure: Some(format!("{e}")),
568+
shutdown: false,
539569
})
540570
}
541571
}
@@ -550,12 +580,15 @@ impl SpiceTestQueryWorker {
550580
results_snapshot: bool,
551581
validate: bool,
552582
) -> Result<()> {
553-
if self.shutdown_token.is_cancelled() {
554-
return Err(anyhow::anyhow!("Shutdown requested"));
555-
}
556-
557-
// Execute query using the configured executor
558-
let result = self.executor.execute(query).await?;
583+
// Race query execution against shutdown signal so in-flight queries
584+
// are aborted immediately when shutdown is requested.
585+
let result = tokio::select! {
586+
biased;
587+
() = self.shutdown_token.cancelled() => {
588+
return Err(anyhow::anyhow!("Shutdown requested"));
589+
}
590+
result = self.executor.execute(query) => result?,
591+
};
559592

560593
// Handle validation if supported and requested
561594
if validate

0 commit comments

Comments
 (0)