Skip to content

Commit 7949042

Browse files
Add Scenario enum and simplify load runner to not reference spiced directly
Introduce a Scenario enum that encapsulates benchmark configuration (query set, end condition) and use it to drive the load runner instead of coupling to BenchRunArgs/DatasetTestArgs and direct spiced instance management. - Add src/scenario.rs with Scenario enum, Display, load_query_set(), and end_condition() (TPCH runs for 60s) - Simplify load runner to accept Scenario + CommonArgs + ADBC connection, removing warm-up/baseline phases, spiced lifecycle, memory monitoring, health monitoring, and query overrides/validation - Remove health.rs (HealthMonitor) as it depended on direct spiced access - Trim CommonArgs to scenario-relevant fields (remove spicepod_path, spiced_path, duration, data_dir, etc.) - Simplify build_test_with_validation to take Scenario instead of DatasetTestArgs - Clean up unused imports, re-exports, and clippy warnings
1 parent b1cadd9 commit 7949042

6 files changed

Lines changed: 102 additions & 429 deletions

File tree

src/args/mod.rs

Lines changed: 4 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -14,52 +14,22 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
use std::path::PathBuf;
18-
1917
use clap::{ArgAction, Parser, ValueEnum};
2018

2119
mod dataset;
22-
pub use dataset::{BenchRunArgs, DatasetTestArgs};
20+
use crate::scenario::Scenario;
2321

2422
/// Arguments Common to all [`TestCommands`].
2523
#[derive(Parser, Debug, Clone)]
2624
pub struct CommonArgs {
27-
/// Path to the spicepod.yaml file
28-
#[arg(short('p'), long, default_value = "spicepod.yaml")]
29-
pub(crate) spicepod_path: PathBuf,
30-
31-
#[arg(short('z'), long)]
32-
pub(crate) spicepod_dependencies: Option<PathBuf>,
25+
/// The scenario to use for the benchmark run, which determines the query set and other parameters.
26+
#[arg(long)]
27+
pub(crate) scenario: Scenario,
3328

3429
/// The number of clients to run simultaneously. Each client will send a query, wait for a response, then send another query.
3530
#[arg(long, default_value = "1")]
3631
pub(crate) concurrency: usize,
3732

38-
/// Path to the spiced binary, or URL to an already-running spiced instance's Flight endpoint
39-
/// (e.g., `http://localhost:50051` to connect to an external instance)
40-
#[arg(short, long, default_value = "spiced")]
41-
pub(crate) spiced_path: String,
42-
43-
/// The number of seconds to wait for the spiced instance to become ready
44-
#[arg(long, default_value = "30")]
45-
pub(crate) ready_wait: u64,
46-
47-
/// The duration of the test in seconds
48-
#[arg(long, default_value = "60")]
49-
pub(crate) duration: u64,
50-
51-
/// Whether to disable progress bars, for CI or non-interactive environments
52-
#[arg(long)]
53-
pub(crate) disable_progress_bars: bool,
54-
55-
/// An optional data directory, to symlink into the spiced instance
56-
#[arg(short, long)]
57-
pub(crate) data_dir: Option<PathBuf>,
58-
59-
/// Whether to enable metrics collection
60-
#[arg(long)]
61-
pub(crate) metrics: bool,
62-
6333
/// Whether to collect SUT metrics via the system adapter JSON-RPC command.
6434
#[arg(long)]
6535
pub(crate) scrape_sut_metrics: bool,
@@ -108,23 +78,6 @@ pub struct CommonArgs {
10878
pub(crate) system_adapter_env: Vec<(String, String)>,
10979
}
11080

111-
#[allow(dead_code)]
112-
impl CommonArgs {
113-
/// Check if `spiced_path` is a URL to an external instance
114-
#[must_use]
115-
#[allow(dead_code)]
116-
pub fn is_external_instance(&self) -> bool {
117-
self.spiced_path.starts_with("http://") || self.spiced_path.starts_with("https://")
118-
}
119-
120-
/// Get the spiced path as a `PathBuf` (only valid when not an external instance)
121-
#[must_use]
122-
#[allow(dead_code)]
123-
pub fn spiced_path_buf(&self) -> PathBuf {
124-
PathBuf::from(&self.spiced_path)
125-
}
126-
}
127-
12881
fn parse_key_val(s: &str) -> Result<(String, String), String> {
12982
let pos = s
13083
.find('=')

src/commands/load/mod.rs

Lines changed: 31 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,18 @@ limitations under the License.
1515
*/
1616
#![allow(dead_code)]
1717

18-
use crate::args::BenchRunArgs;
18+
use crate::{args::CommonArgs, commands::adbc_executor, scenario::Scenario};
1919
use std::sync::Arc;
2020
use std::time::Duration;
2121
use system_adapter_protocol::MetricsResponse;
2222
use test_framework::{
2323
TestType, anyhow,
2424
arrow::util::pretty::print_batches,
25-
git,
2625
metrics::{MetricCollector, NoExtendedMetrics, QueryMetrics, QueryStatus, StatisticsCollector},
2726
opentelemetry::KeyValue,
28-
opentelemetry_sdk::Resource,
2927
spicetest::{
3028
SpiceTest,
31-
datasets::{EndCondition, NotStarted},
29+
datasets::NotStarted,
3230
},
3331
telemetry::streaming::StreamingOtlpExporter,
3432
};
@@ -115,82 +113,31 @@ fn spawn_sut_metrics_scraper(
115113

116114
#[expect(clippy::too_many_lines)]
117115
pub(crate) async fn run(
118-
args: &BenchRunArgs,
119-
adbc_conn: Option<adbc_client::AdbcConnection>,
116+
scenario: &Scenario,
117+
common_args: &CommonArgs,
118+
adbc_conn: adbc_client::AdbcConnection,
120119
) -> anyhow::Result<()> {
121-
if args.test_args.common.concurrency < 2 {
122-
return Err(anyhow::anyhow!(
123-
"Concurrency should be greater than 1 for a load test"
124-
));
125-
}
126-
127-
let sut_name = "spicebench-sut";
128-
129-
let spiced_commit_sha =
130-
std::env::var("SPICED_COMMIT").unwrap_or_else(|_| "unknown".to_string());
131-
let spicebench_commit_sha = git::get_commit_sha();
132-
let branch_name = git::get_branch_name();
133-
let spicepod = args.test_args.common.spicepod_path.display().to_string();
134-
135-
let query_set = args.test_args.load_query_set()?;
136-
let load_resource = Resource::builder_empty()
137-
.with_attributes(vec![
138-
KeyValue::new("service.name", sut_name.to_string()),
139-
KeyValue::new("type", "spicebench"),
140-
KeyValue::new("name", sut_name),
141-
KeyValue::new("query_set", query_set.to_string()),
142-
KeyValue::new("spicebench_commit_sha", spicebench_commit_sha),
143-
KeyValue::new("spiced_commit_sha", spiced_commit_sha),
144-
KeyValue::new("branch_name", branch_name),
145-
KeyValue::new("concurrency", args.test_args.common.concurrency.to_string()),
146-
KeyValue::new("spicepod", spicepod),
147-
KeyValue::new(
148-
"param_set_variants",
149-
args.test_args
150-
.random_param_set_count
151-
.unwrap_or(1)
152-
.to_string(),
153-
),
154-
KeyValue::new(
155-
"protocol",
156-
if args.test_args.http_clients {
157-
"http"
158-
} else {
159-
"flight"
160-
},
161-
),
162-
])
163-
.build();
164-
165-
// Create telemetry with resource upfront, before any metrics calls
166-
let telemetry = super::create_telemetry_with_resource(&args.test_args.common, load_resource);
167-
120+
scenario.load_query_set()?;
168121
// Create the appropriate query executor based on args
169-
let executor = super::create_query_executor(&args.test_args, None, adbc_conn).await?;
122+
let executor = Box::new(adbc_executor::AdbcDirectQueryExecutor::new(adbc_conn));
170123

171-
println!("Starting Spicebench run");
124+
println!("Running benchmark");
172125

173-
let load_end_condition = if args.run_until_stopped {
174-
EndCondition::Unlimited
175-
} else {
176-
EndCondition::Duration(Duration::from_secs(args.test_args.common.duration))
177-
};
126+
let load_end_condition = scenario.end_condition();
178127

179128
// Create streaming OTLP exporter if OTLP endpoint is configured
180-
let streaming_exporter = args
181-
.test_args
182-
.common
129+
let streaming_exporter = common_args
183130
.otlp_endpoint
184131
.as_ref()
185132
.map(|endpoint| StreamingOtlpExporter::spawn(endpoint.clone()));
186133

187134
// Spawn SUT metrics scraper if --scrape-sut-metrics is enabled and a system adapter is configured
188135
let sut_scraper_token = CancellationToken::new();
189-
let sut_scraper_handle = if args.test_args.common.scrape_sut_metrics
190-
&& (args.test_args.common.system_adapter_stdio_cmd.is_some()
191-
|| args.test_args.common.system_adapter_http_url.is_some())
136+
let sut_scraper_handle = if common_args.scrape_sut_metrics
137+
&& (common_args.system_adapter_stdio_cmd.is_some()
138+
|| common_args.system_adapter_http_url.is_some())
192139
{
193-
let adapter = super::connect_system_adapter(&args.test_args.common).await?;
140+
let adapter = super::connect_system_adapter(common_args).await?;
194141
let run_id = uuid::Uuid::new_v4();
195142
println!("SUT metrics scraping enabled (run_id={run_id})");
196143
Some(spawn_sut_metrics_scraper(
@@ -204,43 +151,33 @@ pub(crate) async fn run(
204151
};
205152

206153
// Record client concurrency as a gauge
207-
crate::metrics::ACTIVE_CONNECTIONS.record(
208-
args.test_args.common.concurrency.try_into().unwrap_or(0),
209-
&[],
210-
);
154+
crate::metrics::ACTIVE_CONNECTIONS.record(common_args.concurrency.try_into().unwrap_or(0), &[]);
211155

212156
let mut test_builder = NotStarted::new()
213-
.with_parallel_count(args.test_args.common.concurrency)
157+
.with_parallel_count(common_args.concurrency)
214158
.with_end_condition(load_end_condition)
215-
.with_query_executor(executor)
216-
.with_query_duration_threshold(args.test_args.mark_query_failed_if_exceeds);
159+
.with_query_executor(executor);
217160

218161
// Add streaming metrics sender if exporter is configured
219162
if let Some(exporter) = &streaming_exporter {
220163
test_builder = test_builder.with_streaming_metrics(exporter.sender());
221164
}
222165

223166
let (query_set, test_builder) =
224-
super::build_test_with_validation(&args.test_args, test_builder).await?;
167+
super::build_test_with_validation(scenario, test_builder).await?;
225168

226-
// Use the same query overrides that were applied in build_test_with_validation
227-
let query_overrides = args
228-
.test_args
229-
.query_overrides
230-
.clone()
231-
.map(test_framework::queries::QueryOverrides::from);
232-
let _queries = query_set.get_queries(query_overrides, None, None).await?;
169+
let _queries = query_set.get_queries(None, None, None).await?;
233170

234-
let throughput_test = SpiceTest::<NotStarted>::new("spicebench".into(), test_builder)
235-
.with_progress_bars(!args.test_args.common.disable_progress_bars)
171+
let throughput_test = SpiceTest::<NotStarted>::new(scenario.to_string(), test_builder)
172+
.with_progress_bars(false)
236173
.start()?;
237174
let shutdown_token = throughput_test.cancellation_token();
238175
let test_future = throughput_test.wait();
239176
tokio::pin!(test_future);
240177
let test = match tokio::select! {
241178
res = &mut test_future => res,
242179
_ = signal::ctrl_c() => {
243-
println!("Interrupt received, stopping load test...");
180+
println!("Interrupt received, stopping benchmark...");
244181
shutdown_token.cancel();
245182
test_future.await
246183
}
@@ -250,11 +187,14 @@ pub(crate) async fn run(
250187
return Err(e);
251188
}
252189
};
190+
test.get_query_durations().statistical_set()?;
191+
253192
// Get all query durations for overall statistics before ending the test
254193
let all_durations = test.get_query_durations().clone();
255194
let all_duration_values: Vec<_> = all_durations.values().flatten().copied().collect();
256195

257196
let metrics: QueryMetrics<_, NoExtendedMetrics> = test.collect(TestType::Load)?;
197+
let _ = test.end();
258198

259199
// Record per-query metrics for load test
260200
for query in &metrics.metrics {
@@ -283,13 +223,8 @@ pub(crate) async fn run(
283223
.record((metrics.finished_at - metrics.started_at).try_into()?, &[]);
284224

285225
// Query throughput metrics
286-
let total_iterations: u64 = metrics
287-
.metrics
288-
.iter()
289-
.map(|q| q.iterations as u64)
290-
.sum();
291-
let test_duration_secs =
292-
(metrics.finished_at - metrics.started_at) as f64 / 1000.0;
226+
let total_iterations: u64 = metrics.metrics.iter().map(|q| q.iterations as u64).sum();
227+
let test_duration_secs = (metrics.finished_at - metrics.started_at) as f64 / 1000.0;
293228
crate::metrics::QUERIES_TOTAL.add(total_iterations, &[]);
294229
if test_duration_secs > 0.0 {
295230
let qps = total_iterations as f64 / test_duration_secs;
@@ -300,8 +235,7 @@ pub(crate) async fn run(
300235
.map(|n| n.get() as f64)
301236
.unwrap_or(1.0);
302237
if cpu_cores > 0.0 {
303-
crate::metrics::EFFICIENCY_QUERIES_PER_CORE
304-
.record(qps / cpu_cores, &[]);
238+
crate::metrics::EFFICIENCY_QUERIES_PER_CORE.record(qps / cpu_cores, &[]);
305239
}
306240
}
307241

@@ -319,7 +253,8 @@ pub(crate) async fn run(
319253
);
320254
}
321255

322-
println!("Load test metrics:");
256+
println!("{}", vec!["-"; 30].join(""));
257+
println!("Benchmark metrics:");
323258
let records = metrics.build_records()?;
324259
print_batches(&records)?;
325260

@@ -328,9 +263,6 @@ pub(crate) async fn run(
328263
exporter.shutdown().await;
329264
}
330265

331-
telemetry.emit().await?;
332-
333-
println!("Spicebench run completed");
334-
266+
println!("Benchmark completed");
335267
Ok(())
336268
}

0 commit comments

Comments
 (0)