@@ -15,27 +15,22 @@ limitations under the License.
1515*/
1616#![ allow( dead_code) ]
1717
18- use super :: get_app_and_start_request;
19- use crate :: { args:: BenchRunArgs , health:: HealthMonitor } ;
18+ use crate :: args:: BenchRunArgs ;
2019use std:: sync:: Arc ;
2120use std:: time:: Duration ;
2221use system_adapter_protocol:: MetricsResponse ;
2322use test_framework:: {
2423 TestType , anyhow,
25- app:: AppBuilder ,
2624 arrow:: util:: pretty:: print_batches,
2725 git,
2826 metrics:: { MetricCollector , NoExtendedMetrics , QueryMetrics , QueryStatus , StatisticsCollector } ,
2927 opentelemetry:: KeyValue ,
3028 opentelemetry_sdk:: Resource ,
31- spiced:: SpicedInstance ,
32- spicepod:: Spicepod ,
3329 spicetest:: {
3430 SpiceTest ,
3531 datasets:: { EndCondition , NotStarted } ,
3632 } ,
3733 telemetry:: streaming:: StreamingOtlpExporter ,
38- utils:: observe_memory,
3934} ;
4035use tokio:: signal;
4136use tokio:: sync:: Mutex ;
@@ -129,32 +124,8 @@ pub(crate) async fn run(
129124 ) ) ;
130125 }
131126
132- // Check if connecting to an external instance or starting a new one
133- let ( app, mut spiced_instance) = if args. test_args . common . is_external_instance ( ) {
134- println ! (
135- "Connecting to external spiced instance at: {}" ,
136- args. test_args. common. spiced_path
137- ) ;
138- let spicepod = Spicepod :: load_exact ( args. test_args . common . spicepod_path . clone ( ) ) . await ?;
139- let app = AppBuilder :: new ( spicepod. name . clone ( ) )
140- . with_spicepod ( spicepod)
141- . build ( ) ;
142- let instance = SpicedInstance :: external ( & args. test_args . common . spiced_path ) ;
143- ( app, instance)
144- } else {
145- let ( app, start_request) = get_app_and_start_request ( & args. test_args . common ) . await ?;
146- let instance = SpicedInstance :: start ( start_request) . await ?;
147- ( app, instance)
148- } ;
149-
150- spiced_instance
151- . wait_for_ready ( Duration :: from_secs ( args. test_args . common . ready_wait ) )
152- . await ?;
127+ let sut_name = "spicebench-sut" ;
153128
154- // Build resource with attributes known upfront, before creating telemetry.
155- // This ensures the SdkMeterProvider is created with the correct resource,
156- // so all metrics (including HealthMonitor) have proper resource attributes.
157- let spiced_version = spiced_instance. version ( ) . to_string ( ) ;
158129 let spiced_commit_sha =
159130 std:: env:: var ( "SPICED_COMMIT" ) . unwrap_or_else ( |_| "unknown" . to_string ( ) ) ;
160131 let spicebench_commit_sha = git:: get_commit_sha ( ) ;
@@ -164,10 +135,9 @@ pub(crate) async fn run(
164135 let query_set = args. test_args . load_query_set ( ) ?;
165136 let load_resource = Resource :: builder_empty ( )
166137 . with_attributes ( vec ! [
167- KeyValue :: new( "service.name" , "spicebench" ) ,
168- KeyValue :: new( "type" , "load_test" ) ,
169- KeyValue :: new( "name" , app. name. clone( ) ) ,
170- KeyValue :: new( "spiced_version" , spiced_version) ,
138+ KeyValue :: new( "service.name" , sut_name. to_string( ) ) ,
139+ KeyValue :: new( "type" , "spicebench" ) ,
140+ KeyValue :: new( "name" , sut_name) ,
171141 KeyValue :: new( "query_set" , query_set. to_string( ) ) ,
172142 KeyValue :: new( "spicebench_commit_sha" , spicebench_commit_sha) ,
173143 KeyValue :: new( "spiced_commit_sha" , spiced_commit_sha) ,
@@ -195,70 +165,10 @@ pub(crate) async fn run(
195165 // Create telemetry with resource upfront, before any metrics calls
196166 let telemetry = super :: create_telemetry_with_resource ( & args. test_args . common , load_resource) ;
197167
198- let health_monitor = HealthMonitor :: spawn ( ) ?;
199-
200168 // Create the appropriate query executor based on args
201- let executor = super :: create_query_executor ( & args. test_args , & spiced_instance, adbc_conn) . await ?;
202-
203- // warm up run
204- println ! ( "Performing warm up" ) ;
205-
206- let ( baseline_query_set, test_builder) = super :: build_test_with_validation (
207- & args. test_args ,
208- NotStarted :: new ( )
209- . with_parallel_count ( args. test_args . common . concurrency )
210- . with_end_condition ( EndCondition :: QuerySetCompleted ( 1 ) )
211- . with_query_executor ( executor. clone ( ) ) ,
212- )
213- . await ?;
214-
215- let warm_up = SpiceTest :: < NotStarted > :: new ( app. name . clone ( ) , test_builder)
216- . with_spiced_instance ( spiced_instance)
217- . with_progress_bars ( !args. test_args . common . disable_progress_bars )
218- . start ( ) ?;
219-
220- let spiced_instance = warm_up. wait ( ) . await ?. end ( ) ?;
221-
222- let test_duration = Duration :: from_secs ( args. test_args . common . duration ) ;
169+ let executor = super :: create_query_executor ( & args. test_args , None , adbc_conn) . await ?;
223170
224- // Calculate baseline duration: 10% of target time, min 1min, max 10min
225- let baseline_duration_secs = ( test_duration. as_secs ( ) / 10 ) . clamp ( 60 , 600 ) ;
226- let baseline_duration = Duration :: from_secs ( baseline_duration_secs) ;
227-
228- // baseline run
229- println ! ( "Running baseline throughput test for {baseline_duration_secs}s" , ) ;
230-
231- let ( _, test_builder) = super :: build_test_with_validation (
232- & args. test_args ,
233- NotStarted :: new ( )
234- . with_parallel_count ( args. test_args . common . concurrency )
235- . with_end_condition ( EndCondition :: Duration ( baseline_duration) )
236- . with_query_executor ( executor. clone ( ) ) ,
237- )
238- . await ?;
239-
240- let baseline_test = SpiceTest :: new ( app. name . clone ( ) , test_builder)
241- . with_spiced_instance ( spiced_instance)
242- . with_progress_bars ( !args. test_args . common . disable_progress_bars )
243- . start ( ) ?;
244-
245- let test = baseline_test. wait ( ) . await ?;
246- let baseline_percentiles = test. get_query_durations ( ) . percentile ( 99.0 ) ?;
247-
248- let baseline_metrics: QueryMetrics < _ , NoExtendedMetrics > = test. collect ( TestType :: Load ) ?;
249- println ! ( "Baseline metrics:" ) ;
250- let records = baseline_metrics. build_records ( ) ?;
251- print_batches ( & records) ?;
252- let spiced_instance = test. end ( ) ?;
253- let memory_token = CancellationToken :: new ( ) ;
254- // Memory monitoring is only available for owned spiced instances (not external)
255- let memory_readings = spiced_instance
256- . process ( )
257- . ok ( )
258- . map ( |p| p. watch_memory ( & memory_token) ) ;
259-
260- // load test
261- println ! ( "Running load test" ) ;
171+ println ! ( "Starting Spicebench run" ) ;
262172
263173 let load_end_condition = if args. run_until_stopped {
264174 EndCondition :: Unlimited
@@ -321,8 +231,7 @@ pub(crate) async fn run(
321231 . map ( test_framework:: queries:: QueryOverrides :: from) ;
322232 let _queries = query_set. get_queries ( query_overrides, None , None ) . await ?;
323233
324- let throughput_test = SpiceTest :: < NotStarted > :: new ( app. name . clone ( ) , test_builder)
325- . with_spiced_instance ( spiced_instance)
234+ let throughput_test = SpiceTest :: < NotStarted > :: new ( "spicebench" . into ( ) , test_builder)
326235 . with_progress_bars ( !args. test_args . common . disable_progress_bars )
327236 . start ( ) ?;
328237 let shutdown_token = throughput_test. cancellation_token ( ) ;
@@ -338,26 +247,14 @@ pub(crate) async fn run(
338247 } {
339248 Ok ( test) => test,
340249 Err ( e) => {
341- if let Some ( readings) = memory_readings {
342- let _ = observe_memory ( memory_token, readings) . await ;
343- }
344250 return Err ( e) ;
345251 }
346252 } ;
347- let test_durations = test. get_query_durations ( ) . statistical_set ( ) ?;
348-
349253 // Get all query durations for overall statistics before ending the test
350254 let all_durations = test. get_query_durations ( ) . clone ( ) ;
351255 let all_duration_values: Vec < _ > = all_durations. values ( ) . flatten ( ) . copied ( ) . collect ( ) ;
352256
353257 let metrics: QueryMetrics < _ , NoExtendedMetrics > = test. collect ( TestType :: Load ) ?;
354- let mut spiced_instance = test. end ( ) ?;
355- let ( max_memory, median_memory) = if let Some ( readings) = memory_readings {
356- observe_memory ( memory_token, readings) . await ?
357- } else {
358- println ! ( "Memory monitoring not available for external spiced instances" ) ;
359- ( 0.0 , 0.0 )
360- } ;
361258
362259 // Record per-query metrics for load test
363260 for query in & metrics. metrics {
@@ -384,8 +281,6 @@ pub(crate) async fn run(
384281 }
385282 crate :: metrics:: TEST_DURATION
386283 . record ( ( metrics. finished_at - metrics. started_at ) . try_into ( ) ?, & [ ] ) ;
387- crate :: metrics:: PEAK_MEMORY_USAGE . record ( max_memory * 1024.0 , & [ ] ) ;
388- crate :: metrics:: MEDIAN_MEMORY_USAGE . record ( median_memory * 1024.0 , & [ ] ) ;
389284
390285 // Query throughput metrics
391286 let total_iterations: u64 = metrics
@@ -424,92 +319,18 @@ pub(crate) async fn run(
424319 ) ;
425320 }
426321
427- println ! ( "Baseline metrics:" ) ;
428- let baseline_records = baseline_metrics. build_records ( ) ?;
429- print_batches ( & baseline_records) ?;
430- println ! ( "{}" , vec![ "-" ; 30 ] . join( "" ) ) ;
431322 println ! ( "Load test metrics:" ) ;
432- let records = metrics. with_memory_usage ( max_memory ) . build_records ( ) ?;
323+ let records = metrics. build_records ( ) ?;
433324 print_batches ( & records) ?;
434325
435- let health_report = health_monitor. stop ( ) . await ;
436-
437326 // Shutdown streaming exporter before emitting final telemetry
438327 if let Some ( exporter) = streaming_exporter {
439328 exporter. shutdown ( ) . await ;
440329 }
441330
442331 telemetry. emit ( ) . await ?;
443332
444- spiced_instance. stop ( ) ?;
445- let health_report = health_report?;
446-
447- let mut test_passed = true ;
448- let mut yellow_measurements = 0 ;
449-
450- // Use baseline_queries that represent unique query names, otherwise the same failure
451- // could be reported multiple times for each parameterized query params set variation
452- for query in baseline_query_set
453- . get_queries ( query_overrides, None , None )
454- . await ?
455- {
456- let Some ( baseline_percentile) = baseline_percentiles. get ( & query. name ) else {
457- // Query Failed, no percentile statistics recorded
458- continue ;
459- } ;
460-
461- let Some ( duration) = test_durations. get ( & query. name ) else {
462- return Err ( anyhow:: anyhow!(
463- "Query {} not found in test durations" ,
464- query. name
465- ) ) ;
466- } ;
467-
468- let percentile_99th = duration. percentile ( 99.0 ) ?;
469- if percentile_99th. as_millis ( ) < 1000 {
470- continue ; // skip queries that are too fast to be meaningful
471- }
472-
473- let percentile_ratio =
474- ( ( percentile_99th. as_secs_f64 ( ) / baseline_percentile. as_secs_f64 ( ) ) - 1.0 ) * 100.0 ;
475-
476- // yellow measurements = 10% to 20% increase
477- // red measurements = > 20% increase
478- let ( yellow, red) = (
479- percentile_ratio > 10.0 && percentile_ratio <= 20.0 ,
480- percentile_ratio > 20.0 ,
481- ) ;
482-
483- if red {
484- println ! (
485- "FAIL - Query {query} has a 99th percentile that increased {percentile_ratio}% of the baseline 99th percentile" ,
486- query = query. name
487- ) ;
488- test_passed = false ;
489- } else if yellow {
490- println ! (
491- "WARN - Query {query} has a 99th percentile that increased {percentile_ratio}% of the baseline 99th percentile" ,
492- query = query. name
493- ) ;
494- yellow_measurements += 1 ;
495- }
496- }
497-
498- let mut failure_messages = Vec :: new ( ) ;
499- if !args. no_error && yellow_measurements >= 3 {
500- failure_messages. push ( "Load test failed due to too many yellow measurements" . to_string ( ) ) ;
501- }
502- if !args. no_error && !test_passed {
503- failure_messages. push ( "Load test failed." . to_string ( ) ) ;
504- }
505- if let Some ( message) = health_report. failure_message ( ) {
506- failure_messages. push ( message) ;
507- }
508-
509- if !failure_messages. is_empty ( ) {
510- return Err ( anyhow:: anyhow!( failure_messages. join( "\n " ) ) ) ;
511- }
333+ println ! ( "Spicebench run completed" ) ;
512334
513- println ! ( "Load test completed" ) ;
514335 Ok ( ( ) )
515336}
0 commit comments