Skip to content

Commit 039e3a3

Browse files
authored
Simplify test run and remove dependency to running Spice instance (#39)
1 parent 6b52ebb commit 039e3a3

9 files changed

Lines changed: 51 additions & 251 deletions

File tree

crates/test-framework/src/spicetest/datasets/mod.rs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -534,12 +534,11 @@ impl MetricCollector<DatasetMetrics, NoExtendedMetrics> for SpiceTest<Completed>
534534
}
535535

536536
fn spiced_version(&self) -> Result<&str> {
537-
let spiced_instance = self.spiced_instance.as_ref().ok_or(
538-
anyhow::anyhow!(
539-
"Spiced instance is not available. SpiceTest must be started before metrics can be collected."
540-
))?;
541-
542-
Ok(spiced_instance.version())
537+
Ok(self
538+
.spiced_instance
539+
.as_ref()
540+
.map(|i| i.version())
541+
.unwrap_or("unknown"))
543542
}
544543

545544
fn metrics(&self) -> Result<Vec<QueryMetric<DatasetMetrics>>> {
@@ -588,12 +587,11 @@ impl MetricCollector<NoExtendedMetrics, ThroughputMetrics> for SpiceTest<Complet
588587
}
589588

590589
fn spiced_version(&self) -> Result<&str> {
591-
let spiced_instance = self.spiced_instance.as_ref().ok_or(
592-
anyhow::anyhow!(
593-
"Spiced instance is not available. SpiceTest must be started before metrics can be collected."
594-
))?;
595-
596-
Ok(spiced_instance.version())
590+
Ok(self
591+
.spiced_instance
592+
.as_ref()
593+
.map(|i| i.version())
594+
.unwrap_or("unknown"))
597595
}
598596

599597
fn metrics(&self) -> Result<Vec<QueryMetric<NoExtendedMetrics>>> {

crates/test-framework/src/spicetest/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,10 @@ pub struct SpiceTest<S: TestState> {
4545
}
4646

4747
impl<S: TestCompleted> SpiceTest<S> {
48-
/// Once the test has completed, return ownership of the spiced instance
49-
pub fn end(self) -> Result<SpicedInstance> {
48+
/// Once the test has completed, return ownership of the spiced instance (if any).
49+
#[must_use]
50+
pub fn end(self) -> Option<SpicedInstance> {
5051
self.spiced_instance
51-
.context("Spiced instance should be present")
5252
}
5353
}
5454

crates/test-framework/src/spicetest/search/mod.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -218,12 +218,11 @@ impl MetricCollector<SearchScoreMetric, SearchRunMetric> for SpiceTest<Completed
218218
}
219219

220220
fn spiced_version(&self) -> Result<&str> {
221-
let spiced_instance = self.spiced_instance.as_ref().ok_or(
222-
anyhow::anyhow!(
223-
"Spiced instance is not available. SpiceTest must be started before metrics can be collected."
224-
))?;
225-
226-
Ok(spiced_instance.version())
221+
Ok(self
222+
.spiced_instance
223+
.as_ref()
224+
.map(|i| i.version())
225+
.unwrap_or("unknown"))
227226
}
228227

229228
fn metrics(&self) -> Result<Vec<QueryMetric<SearchScoreMetric>>> {

crates/test-framework/src/spicetest/text_to_sql/mod.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -262,12 +262,11 @@ impl MetricCollector<TextToSqlMetric, TextToSqlRunMetric> for SpiceTest<Complete
262262
}
263263

264264
fn spiced_version(&self) -> Result<&str> {
265-
let spiced_instance = self.spiced_instance.as_ref().ok_or(
266-
anyhow::anyhow!(
267-
"Spiced instance is not available. SpiceTest must be started before metrics can be collected."
268-
))?;
269-
270-
Ok(spiced_instance.version())
265+
Ok(self
266+
.spiced_instance
267+
.as_ref()
268+
.map(|i| i.version())
269+
.unwrap_or("unknown"))
271270
}
272271

273272
fn metrics(&self) -> Result<Vec<QueryMetric<TextToSqlMetric>>> {

src/args/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ pub struct CommonArgs {
108108
pub(crate) system_adapter_env: Vec<(String, String)>,
109109
}
110110

111+
#[allow(dead_code)]
111112
impl CommonArgs {
112113
/// Check if `spiced_path` is a URL to an external instance
113114
#[must_use]

src/commands/load/mod.rs

Lines changed: 10 additions & 189 deletions
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,22 @@ limitations under the License.
1515
*/
1616
#![allow(dead_code)]
1717

18-
use super::get_app_and_start_request;
19-
use crate::{args::BenchRunArgs, health::HealthMonitor};
18+
use crate::args::BenchRunArgs;
2019
use std::sync::Arc;
2120
use std::time::Duration;
2221
use system_adapter_protocol::MetricsResponse;
2322
use test_framework::{
2423
TestType, anyhow,
25-
app::AppBuilder,
2624
arrow::util::pretty::print_batches,
2725
git,
2826
metrics::{MetricCollector, NoExtendedMetrics, QueryMetrics, QueryStatus, StatisticsCollector},
2927
opentelemetry::KeyValue,
3028
opentelemetry_sdk::Resource,
31-
spiced::SpicedInstance,
32-
spicepod::Spicepod,
3329
spicetest::{
3430
SpiceTest,
3531
datasets::{EndCondition, NotStarted},
3632
},
3733
telemetry::streaming::StreamingOtlpExporter,
38-
utils::observe_memory,
3934
};
4035
use tokio::signal;
4136
use tokio::sync::Mutex;
@@ -129,32 +124,8 @@ pub(crate) async fn run(
129124
));
130125
}
131126

132-
// Check if connecting to an external instance or starting a new one
133-
let (app, mut spiced_instance) = if args.test_args.common.is_external_instance() {
134-
println!(
135-
"Connecting to external spiced instance at: {}",
136-
args.test_args.common.spiced_path
137-
);
138-
let spicepod = Spicepod::load_exact(args.test_args.common.spicepod_path.clone()).await?;
139-
let app = AppBuilder::new(spicepod.name.clone())
140-
.with_spicepod(spicepod)
141-
.build();
142-
let instance = SpicedInstance::external(&args.test_args.common.spiced_path);
143-
(app, instance)
144-
} else {
145-
let (app, start_request) = get_app_and_start_request(&args.test_args.common).await?;
146-
let instance = SpicedInstance::start(start_request).await?;
147-
(app, instance)
148-
};
149-
150-
spiced_instance
151-
.wait_for_ready(Duration::from_secs(args.test_args.common.ready_wait))
152-
.await?;
127+
let sut_name = "spicebench-sut";
153128

154-
// Build resource with attributes known upfront, before creating telemetry.
155-
// This ensures the SdkMeterProvider is created with the correct resource,
156-
// so all metrics (including HealthMonitor) have proper resource attributes.
157-
let spiced_version = spiced_instance.version().to_string();
158129
let spiced_commit_sha =
159130
std::env::var("SPICED_COMMIT").unwrap_or_else(|_| "unknown".to_string());
160131
let spicebench_commit_sha = git::get_commit_sha();
@@ -164,10 +135,9 @@ pub(crate) async fn run(
164135
let query_set = args.test_args.load_query_set()?;
165136
let load_resource = Resource::builder_empty()
166137
.with_attributes(vec![
167-
KeyValue::new("service.name", "spicebench"),
168-
KeyValue::new("type", "load_test"),
169-
KeyValue::new("name", app.name.clone()),
170-
KeyValue::new("spiced_version", spiced_version),
138+
KeyValue::new("service.name", sut_name.to_string()),
139+
KeyValue::new("type", "spicebench"),
140+
KeyValue::new("name", sut_name),
171141
KeyValue::new("query_set", query_set.to_string()),
172142
KeyValue::new("spicebench_commit_sha", spicebench_commit_sha),
173143
KeyValue::new("spiced_commit_sha", spiced_commit_sha),
@@ -195,70 +165,10 @@ pub(crate) async fn run(
195165
// Create telemetry with resource upfront, before any metrics calls
196166
let telemetry = super::create_telemetry_with_resource(&args.test_args.common, load_resource);
197167

198-
let health_monitor = HealthMonitor::spawn()?;
199-
200168
// Create the appropriate query executor based on args
201-
let executor = super::create_query_executor(&args.test_args, &spiced_instance, adbc_conn).await?;
202-
203-
// warm up run
204-
println!("Performing warm up");
205-
206-
let (baseline_query_set, test_builder) = super::build_test_with_validation(
207-
&args.test_args,
208-
NotStarted::new()
209-
.with_parallel_count(args.test_args.common.concurrency)
210-
.with_end_condition(EndCondition::QuerySetCompleted(1))
211-
.with_query_executor(executor.clone()),
212-
)
213-
.await?;
214-
215-
let warm_up = SpiceTest::<NotStarted>::new(app.name.clone(), test_builder)
216-
.with_spiced_instance(spiced_instance)
217-
.with_progress_bars(!args.test_args.common.disable_progress_bars)
218-
.start()?;
219-
220-
let spiced_instance = warm_up.wait().await?.end()?;
221-
222-
let test_duration = Duration::from_secs(args.test_args.common.duration);
169+
let executor = super::create_query_executor(&args.test_args, None, adbc_conn).await?;
223170

224-
// Calculate baseline duration: 10% of target time, min 1min, max 10min
225-
let baseline_duration_secs = (test_duration.as_secs() / 10).clamp(60, 600);
226-
let baseline_duration = Duration::from_secs(baseline_duration_secs);
227-
228-
// baseline run
229-
println!("Running baseline throughput test for {baseline_duration_secs}s",);
230-
231-
let (_, test_builder) = super::build_test_with_validation(
232-
&args.test_args,
233-
NotStarted::new()
234-
.with_parallel_count(args.test_args.common.concurrency)
235-
.with_end_condition(EndCondition::Duration(baseline_duration))
236-
.with_query_executor(executor.clone()),
237-
)
238-
.await?;
239-
240-
let baseline_test = SpiceTest::new(app.name.clone(), test_builder)
241-
.with_spiced_instance(spiced_instance)
242-
.with_progress_bars(!args.test_args.common.disable_progress_bars)
243-
.start()?;
244-
245-
let test = baseline_test.wait().await?;
246-
let baseline_percentiles = test.get_query_durations().percentile(99.0)?;
247-
248-
let baseline_metrics: QueryMetrics<_, NoExtendedMetrics> = test.collect(TestType::Load)?;
249-
println!("Baseline metrics:");
250-
let records = baseline_metrics.build_records()?;
251-
print_batches(&records)?;
252-
let spiced_instance = test.end()?;
253-
let memory_token = CancellationToken::new();
254-
// Memory monitoring is only available for owned spiced instances (not external)
255-
let memory_readings = spiced_instance
256-
.process()
257-
.ok()
258-
.map(|p| p.watch_memory(&memory_token));
259-
260-
// load test
261-
println!("Running load test");
171+
println!("Starting Spicebench run");
262172

263173
let load_end_condition = if args.run_until_stopped {
264174
EndCondition::Unlimited
@@ -321,8 +231,7 @@ pub(crate) async fn run(
321231
.map(test_framework::queries::QueryOverrides::from);
322232
let _queries = query_set.get_queries(query_overrides, None, None).await?;
323233

324-
let throughput_test = SpiceTest::<NotStarted>::new(app.name.clone(), test_builder)
325-
.with_spiced_instance(spiced_instance)
234+
let throughput_test = SpiceTest::<NotStarted>::new("spicebench".into(), test_builder)
326235
.with_progress_bars(!args.test_args.common.disable_progress_bars)
327236
.start()?;
328237
let shutdown_token = throughput_test.cancellation_token();
@@ -338,26 +247,14 @@ pub(crate) async fn run(
338247
} {
339248
Ok(test) => test,
340249
Err(e) => {
341-
if let Some(readings) = memory_readings {
342-
let _ = observe_memory(memory_token, readings).await;
343-
}
344250
return Err(e);
345251
}
346252
};
347-
let test_durations = test.get_query_durations().statistical_set()?;
348-
349253
// Get all query durations for overall statistics before ending the test
350254
let all_durations = test.get_query_durations().clone();
351255
let all_duration_values: Vec<_> = all_durations.values().flatten().copied().collect();
352256

353257
let metrics: QueryMetrics<_, NoExtendedMetrics> = test.collect(TestType::Load)?;
354-
let mut spiced_instance = test.end()?;
355-
let (max_memory, median_memory) = if let Some(readings) = memory_readings {
356-
observe_memory(memory_token, readings).await?
357-
} else {
358-
println!("Memory monitoring not available for external spiced instances");
359-
(0.0, 0.0)
360-
};
361258

362259
// Record per-query metrics for load test
363260
for query in &metrics.metrics {
@@ -384,8 +281,6 @@ pub(crate) async fn run(
384281
}
385282
crate::metrics::TEST_DURATION
386283
.record((metrics.finished_at - metrics.started_at).try_into()?, &[]);
387-
crate::metrics::PEAK_MEMORY_USAGE.record(max_memory * 1024.0, &[]);
388-
crate::metrics::MEDIAN_MEMORY_USAGE.record(median_memory * 1024.0, &[]);
389284

390285
// Query throughput metrics
391286
let total_iterations: u64 = metrics
@@ -424,92 +319,18 @@ pub(crate) async fn run(
424319
);
425320
}
426321

427-
println!("Baseline metrics:");
428-
let baseline_records = baseline_metrics.build_records()?;
429-
print_batches(&baseline_records)?;
430-
println!("{}", vec!["-"; 30].join(""));
431322
println!("Load test metrics:");
432-
let records = metrics.with_memory_usage(max_memory).build_records()?;
323+
let records = metrics.build_records()?;
433324
print_batches(&records)?;
434325

435-
let health_report = health_monitor.stop().await;
436-
437326
// Shutdown streaming exporter before emitting final telemetry
438327
if let Some(exporter) = streaming_exporter {
439328
exporter.shutdown().await;
440329
}
441330

442331
telemetry.emit().await?;
443332

444-
spiced_instance.stop()?;
445-
let health_report = health_report?;
446-
447-
let mut test_passed = true;
448-
let mut yellow_measurements = 0;
449-
450-
// Use baseline_queries that represent unique query names, otherwise the same failure
451-
// could be reported multiple times for each parameterized query params set variation
452-
for query in baseline_query_set
453-
.get_queries(query_overrides, None, None)
454-
.await?
455-
{
456-
let Some(baseline_percentile) = baseline_percentiles.get(&query.name) else {
457-
// Query Failed, no percentile statistics recorded
458-
continue;
459-
};
460-
461-
let Some(duration) = test_durations.get(&query.name) else {
462-
return Err(anyhow::anyhow!(
463-
"Query {} not found in test durations",
464-
query.name
465-
));
466-
};
467-
468-
let percentile_99th = duration.percentile(99.0)?;
469-
if percentile_99th.as_millis() < 1000 {
470-
continue; // skip queries that are too fast to be meaningful
471-
}
472-
473-
let percentile_ratio =
474-
((percentile_99th.as_secs_f64() / baseline_percentile.as_secs_f64()) - 1.0) * 100.0;
475-
476-
// yellow measurements = 10% to 20% increase
477-
// red measurements = > 20% increase
478-
let (yellow, red) = (
479-
percentile_ratio > 10.0 && percentile_ratio <= 20.0,
480-
percentile_ratio > 20.0,
481-
);
482-
483-
if red {
484-
println!(
485-
"FAIL - Query {query} has a 99th percentile that increased {percentile_ratio}% of the baseline 99th percentile",
486-
query = query.name
487-
);
488-
test_passed = false;
489-
} else if yellow {
490-
println!(
491-
"WARN - Query {query} has a 99th percentile that increased {percentile_ratio}% of the baseline 99th percentile",
492-
query = query.name
493-
);
494-
yellow_measurements += 1;
495-
}
496-
}
497-
498-
let mut failure_messages = Vec::new();
499-
if !args.no_error && yellow_measurements >= 3 {
500-
failure_messages.push("Load test failed due to too many yellow measurements".to_string());
501-
}
502-
if !args.no_error && !test_passed {
503-
failure_messages.push("Load test failed.".to_string());
504-
}
505-
if let Some(message) = health_report.failure_message() {
506-
failure_messages.push(message);
507-
}
508-
509-
if !failure_messages.is_empty() {
510-
return Err(anyhow::anyhow!(failure_messages.join("\n")));
511-
}
333+
println!("Spicebench run completed");
512334

513-
println!("Load test completed");
514335
Ok(())
515336
}

0 commit comments

Comments
 (0)