Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions crates/test-framework/src/spicetest/datasets/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,12 +534,11 @@ impl MetricCollector<DatasetMetrics, NoExtendedMetrics> for SpiceTest<Completed>
}

fn spiced_version(&self) -> Result<&str> {
let spiced_instance = self.spiced_instance.as_ref().ok_or(
anyhow::anyhow!(
"Spiced instance is not available. SpiceTest must be started before metrics can be collected."
))?;

Ok(spiced_instance.version())
Ok(self
.spiced_instance
.as_ref()
.map(|i| i.version())
.unwrap_or("unknown"))
}

fn metrics(&self) -> Result<Vec<QueryMetric<DatasetMetrics>>> {
Expand Down Expand Up @@ -588,12 +587,11 @@ impl MetricCollector<NoExtendedMetrics, ThroughputMetrics> for SpiceTest<Complet
}

fn spiced_version(&self) -> Result<&str> {
let spiced_instance = self.spiced_instance.as_ref().ok_or(
anyhow::anyhow!(
"Spiced instance is not available. SpiceTest must be started before metrics can be collected."
))?;

Ok(spiced_instance.version())
Ok(self
.spiced_instance
.as_ref()
.map(|i| i.version())
.unwrap_or("unknown"))
}

fn metrics(&self) -> Result<Vec<QueryMetric<NoExtendedMetrics>>> {
Expand Down
6 changes: 3 additions & 3 deletions crates/test-framework/src/spicetest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ pub struct SpiceTest<S: TestState> {
}

impl<S: TestCompleted> SpiceTest<S> {
/// Once the test has completed, return ownership of the spiced instance
pub fn end(self) -> Result<SpicedInstance> {
/// Once the test has completed, return ownership of the spiced instance (if any).
#[must_use]
pub fn end(self) -> Option<SpicedInstance> {
self.spiced_instance
.context("Spiced instance should be present")
}
}

Expand Down
11 changes: 5 additions & 6 deletions crates/test-framework/src/spicetest/search/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,11 @@ impl MetricCollector<SearchScoreMetric, SearchRunMetric> for SpiceTest<Completed
}

fn spiced_version(&self) -> Result<&str> {
let spiced_instance = self.spiced_instance.as_ref().ok_or(
anyhow::anyhow!(
"Spiced instance is not available. SpiceTest must be started before metrics can be collected."
))?;

Ok(spiced_instance.version())
Ok(self
.spiced_instance
.as_ref()
.map(|i| i.version())
.unwrap_or("unknown"))
}

fn metrics(&self) -> Result<Vec<QueryMetric<SearchScoreMetric>>> {
Expand Down
11 changes: 5 additions & 6 deletions crates/test-framework/src/spicetest/text_to_sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,11 @@ impl MetricCollector<TextToSqlMetric, TextToSqlRunMetric> for SpiceTest<Complete
}

fn spiced_version(&self) -> Result<&str> {
let spiced_instance = self.spiced_instance.as_ref().ok_or(
anyhow::anyhow!(
"Spiced instance is not available. SpiceTest must be started before metrics can be collected."
))?;

Ok(spiced_instance.version())
Ok(self
.spiced_instance
.as_ref()
.map(|i| i.version())
.unwrap_or("unknown"))
}

fn metrics(&self) -> Result<Vec<QueryMetric<TextToSqlMetric>>> {
Expand Down
1 change: 1 addition & 0 deletions src/args/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ pub struct CommonArgs {
pub(crate) system_adapter_env: Vec<(String, String)>,
}

#[allow(dead_code)]
impl CommonArgs {
/// Check if `spiced_path` is a URL to an external instance
#[must_use]
Expand Down
199 changes: 10 additions & 189 deletions src/commands/load/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,22 @@ limitations under the License.
*/
#![allow(dead_code)]

use super::get_app_and_start_request;
use crate::{args::BenchRunArgs, health::HealthMonitor};
use crate::args::BenchRunArgs;
use std::sync::Arc;
use std::time::Duration;
use system_adapter_protocol::MetricsResponse;
use test_framework::{
TestType, anyhow,
app::AppBuilder,
arrow::util::pretty::print_batches,
git,
metrics::{MetricCollector, NoExtendedMetrics, QueryMetrics, QueryStatus, StatisticsCollector},
opentelemetry::KeyValue,
opentelemetry_sdk::Resource,
spiced::SpicedInstance,
spicepod::Spicepod,
spicetest::{
SpiceTest,
datasets::{EndCondition, NotStarted},
},
telemetry::streaming::StreamingOtlpExporter,
utils::observe_memory,
};
use tokio::signal;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -129,32 +124,8 @@ pub(crate) async fn run(
));
}

// Check if connecting to an external instance or starting a new one
let (app, mut spiced_instance) = if args.test_args.common.is_external_instance() {
println!(
"Connecting to external spiced instance at: {}",
args.test_args.common.spiced_path
);
let spicepod = Spicepod::load_exact(args.test_args.common.spicepod_path.clone()).await?;
let app = AppBuilder::new(spicepod.name.clone())
.with_spicepod(spicepod)
.build();
let instance = SpicedInstance::external(&args.test_args.common.spiced_path);
(app, instance)
} else {
let (app, start_request) = get_app_and_start_request(&args.test_args.common).await?;
let instance = SpicedInstance::start(start_request).await?;
(app, instance)
};

spiced_instance
.wait_for_ready(Duration::from_secs(args.test_args.common.ready_wait))
.await?;
let sut_name = "spicebench-sut";

// Build resource with attributes known upfront, before creating telemetry.
// This ensures the SdkMeterProvider is created with the correct resource,
// so all metrics (including HealthMonitor) have proper resource attributes.
let spiced_version = spiced_instance.version().to_string();
let spiced_commit_sha =
std::env::var("SPICED_COMMIT").unwrap_or_else(|_| "unknown".to_string());
let spicebench_commit_sha = git::get_commit_sha();
Expand All @@ -164,10 +135,9 @@ pub(crate) async fn run(
let query_set = args.test_args.load_query_set()?;
let load_resource = Resource::builder_empty()
.with_attributes(vec![
KeyValue::new("service.name", "spicebench"),
KeyValue::new("type", "load_test"),
KeyValue::new("name", app.name.clone()),
KeyValue::new("spiced_version", spiced_version),
KeyValue::new("service.name", sut_name.to_string()),
KeyValue::new("type", "spicebench"),
KeyValue::new("name", sut_name),
KeyValue::new("query_set", query_set.to_string()),
KeyValue::new("spicebench_commit_sha", spicebench_commit_sha),
KeyValue::new("spiced_commit_sha", spiced_commit_sha),
Expand Down Expand Up @@ -195,70 +165,10 @@ pub(crate) async fn run(
// Create telemetry with resource upfront, before any metrics calls
let telemetry = super::create_telemetry_with_resource(&args.test_args.common, load_resource);

let health_monitor = HealthMonitor::spawn()?;

// Create the appropriate query executor based on args
let executor = super::create_query_executor(&args.test_args, &spiced_instance, adbc_conn).await?;

// warm up run
println!("Performing warm up");

let (baseline_query_set, test_builder) = super::build_test_with_validation(
&args.test_args,
NotStarted::new()
.with_parallel_count(args.test_args.common.concurrency)
.with_end_condition(EndCondition::QuerySetCompleted(1))
.with_query_executor(executor.clone()),
)
.await?;

let warm_up = SpiceTest::<NotStarted>::new(app.name.clone(), test_builder)
.with_spiced_instance(spiced_instance)
.with_progress_bars(!args.test_args.common.disable_progress_bars)
.start()?;

let spiced_instance = warm_up.wait().await?.end()?;

let test_duration = Duration::from_secs(args.test_args.common.duration);
let executor = super::create_query_executor(&args.test_args, None, adbc_conn).await?;

// Calculate baseline duration: 10% of target time, min 1min, max 10min
let baseline_duration_secs = (test_duration.as_secs() / 10).clamp(60, 600);
let baseline_duration = Duration::from_secs(baseline_duration_secs);

// baseline run
println!("Running baseline throughput test for {baseline_duration_secs}s",);

let (_, test_builder) = super::build_test_with_validation(
&args.test_args,
NotStarted::new()
.with_parallel_count(args.test_args.common.concurrency)
.with_end_condition(EndCondition::Duration(baseline_duration))
.with_query_executor(executor.clone()),
)
.await?;

let baseline_test = SpiceTest::new(app.name.clone(), test_builder)
.with_spiced_instance(spiced_instance)
.with_progress_bars(!args.test_args.common.disable_progress_bars)
.start()?;

let test = baseline_test.wait().await?;
let baseline_percentiles = test.get_query_durations().percentile(99.0)?;

let baseline_metrics: QueryMetrics<_, NoExtendedMetrics> = test.collect(TestType::Load)?;
println!("Baseline metrics:");
let records = baseline_metrics.build_records()?;
print_batches(&records)?;
let spiced_instance = test.end()?;
let memory_token = CancellationToken::new();
// Memory monitoring is only available for owned spiced instances (not external)
let memory_readings = spiced_instance
.process()
.ok()
.map(|p| p.watch_memory(&memory_token));

// load test
println!("Running load test");
println!("Starting Spicebench run");

let load_end_condition = if args.run_until_stopped {
EndCondition::Unlimited
Expand Down Expand Up @@ -321,8 +231,7 @@ pub(crate) async fn run(
.map(test_framework::queries::QueryOverrides::from);
let _queries = query_set.get_queries(query_overrides, None, None).await?;

let throughput_test = SpiceTest::<NotStarted>::new(app.name.clone(), test_builder)
.with_spiced_instance(spiced_instance)
let throughput_test = SpiceTest::<NotStarted>::new("spicebench".into(), test_builder)
.with_progress_bars(!args.test_args.common.disable_progress_bars)
.start()?;
let shutdown_token = throughput_test.cancellation_token();
Expand All @@ -338,26 +247,14 @@ pub(crate) async fn run(
} {
Ok(test) => test,
Err(e) => {
if let Some(readings) = memory_readings {
let _ = observe_memory(memory_token, readings).await;
}
return Err(e);
}
};
let test_durations = test.get_query_durations().statistical_set()?;

// Get all query durations for overall statistics before ending the test
let all_durations = test.get_query_durations().clone();
let all_duration_values: Vec<_> = all_durations.values().flatten().copied().collect();

let metrics: QueryMetrics<_, NoExtendedMetrics> = test.collect(TestType::Load)?;
let mut spiced_instance = test.end()?;
let (max_memory, median_memory) = if let Some(readings) = memory_readings {
observe_memory(memory_token, readings).await?
} else {
println!("Memory monitoring not available for external spiced instances");
(0.0, 0.0)
};

// Record per-query metrics for load test
for query in &metrics.metrics {
Expand All @@ -384,8 +281,6 @@ pub(crate) async fn run(
}
crate::metrics::TEST_DURATION
.record((metrics.finished_at - metrics.started_at).try_into()?, &[]);
crate::metrics::PEAK_MEMORY_USAGE.record(max_memory * 1024.0, &[]);
crate::metrics::MEDIAN_MEMORY_USAGE.record(median_memory * 1024.0, &[]);

// Query throughput metrics
let total_iterations: u64 = metrics
Expand Down Expand Up @@ -424,92 +319,18 @@ pub(crate) async fn run(
);
}

println!("Baseline metrics:");
let baseline_records = baseline_metrics.build_records()?;
print_batches(&baseline_records)?;
println!("{}", vec!["-"; 30].join(""));
println!("Load test metrics:");
let records = metrics.with_memory_usage(max_memory).build_records()?;
let records = metrics.build_records()?;
print_batches(&records)?;

let health_report = health_monitor.stop().await;

// Shutdown streaming exporter before emitting final telemetry
if let Some(exporter) = streaming_exporter {
exporter.shutdown().await;
}

telemetry.emit().await?;

spiced_instance.stop()?;
let health_report = health_report?;

let mut test_passed = true;
let mut yellow_measurements = 0;

// Use baseline_queries that represent unique query names, otherwise the same failure
// could be reported multiple times for each parameterized query params set variation
for query in baseline_query_set
.get_queries(query_overrides, None, None)
.await?
{
let Some(baseline_percentile) = baseline_percentiles.get(&query.name) else {
// Query Failed, no percentile statistics recorded
continue;
};

let Some(duration) = test_durations.get(&query.name) else {
return Err(anyhow::anyhow!(
"Query {} not found in test durations",
query.name
));
};

let percentile_99th = duration.percentile(99.0)?;
if percentile_99th.as_millis() < 1000 {
continue; // skip queries that are too fast to be meaningful
}

let percentile_ratio =
((percentile_99th.as_secs_f64() / baseline_percentile.as_secs_f64()) - 1.0) * 100.0;

// yellow measurements = 10% to 20% increase
// red measurements = > 20% increase
let (yellow, red) = (
percentile_ratio > 10.0 && percentile_ratio <= 20.0,
percentile_ratio > 20.0,
);

if red {
println!(
"FAIL - Query {query} has a 99th percentile that increased {percentile_ratio}% of the baseline 99th percentile",
query = query.name
);
test_passed = false;
} else if yellow {
println!(
"WARN - Query {query} has a 99th percentile that increased {percentile_ratio}% of the baseline 99th percentile",
query = query.name
);
yellow_measurements += 1;
}
}

let mut failure_messages = Vec::new();
if !args.no_error && yellow_measurements >= 3 {
failure_messages.push("Load test failed due to too many yellow measurements".to_string());
}
if !args.no_error && !test_passed {
failure_messages.push("Load test failed.".to_string());
}
if let Some(message) = health_report.failure_message() {
failure_messages.push(message);
}

if !failure_messages.is_empty() {
return Err(anyhow::anyhow!(failure_messages.join("\n")));
}
println!("Spicebench run completed");

println!("Load test completed");
Ok(())
}
Loading
Loading