@@ -15,20 +15,18 @@ limitations under the License.
1515*/
1616#![ allow( dead_code) ]
1717
18- use crate :: args:: BenchRunArgs ;
18+ use crate :: { args:: CommonArgs , commands :: adbc_executor , scenario :: Scenario } ;
1919use std:: sync:: Arc ;
2020use std:: time:: Duration ;
2121use system_adapter_protocol:: MetricsResponse ;
2222use test_framework:: {
2323 TestType , anyhow,
2424 arrow:: util:: pretty:: print_batches,
25- git,
2625 metrics:: { MetricCollector , NoExtendedMetrics , QueryMetrics , QueryStatus , StatisticsCollector } ,
2726 opentelemetry:: KeyValue ,
28- opentelemetry_sdk:: Resource ,
2927 spicetest:: {
3028 SpiceTest ,
31- datasets:: { EndCondition , NotStarted } ,
29+ datasets:: NotStarted ,
3230 } ,
3331 telemetry:: streaming:: StreamingOtlpExporter ,
3432} ;
@@ -115,82 +113,31 @@ fn spawn_sut_metrics_scraper(
115113
116114#[ expect( clippy:: too_many_lines) ]
117115pub ( crate ) async fn run (
118- args : & BenchRunArgs ,
119- adbc_conn : Option < adbc_client:: AdbcConnection > ,
116+ scenario : & Scenario ,
117+ common_args : & CommonArgs ,
118+ adbc_conn : adbc_client:: AdbcConnection ,
120119) -> anyhow:: Result < ( ) > {
121- if args. test_args . common . concurrency < 2 {
122- return Err ( anyhow:: anyhow!(
123- "Concurrency should be greater than 1 for a load test"
124- ) ) ;
125- }
126-
127- let sut_name = "spicebench-sut" ;
128-
129- let spiced_commit_sha =
130- std:: env:: var ( "SPICED_COMMIT" ) . unwrap_or_else ( |_| "unknown" . to_string ( ) ) ;
131- let spicebench_commit_sha = git:: get_commit_sha ( ) ;
132- let branch_name = git:: get_branch_name ( ) ;
133- let spicepod = args. test_args . common . spicepod_path . display ( ) . to_string ( ) ;
134-
135- let query_set = args. test_args . load_query_set ( ) ?;
136- let load_resource = Resource :: builder_empty ( )
137- . with_attributes ( vec ! [
138- KeyValue :: new( "service.name" , sut_name. to_string( ) ) ,
139- KeyValue :: new( "type" , "spicebench" ) ,
140- KeyValue :: new( "name" , sut_name) ,
141- KeyValue :: new( "query_set" , query_set. to_string( ) ) ,
142- KeyValue :: new( "spicebench_commit_sha" , spicebench_commit_sha) ,
143- KeyValue :: new( "spiced_commit_sha" , spiced_commit_sha) ,
144- KeyValue :: new( "branch_name" , branch_name) ,
145- KeyValue :: new( "concurrency" , args. test_args. common. concurrency. to_string( ) ) ,
146- KeyValue :: new( "spicepod" , spicepod) ,
147- KeyValue :: new(
148- "param_set_variants" ,
149- args. test_args
150- . random_param_set_count
151- . unwrap_or( 1 )
152- . to_string( ) ,
153- ) ,
154- KeyValue :: new(
155- "protocol" ,
156- if args. test_args. http_clients {
157- "http"
158- } else {
159- "flight"
160- } ,
161- ) ,
162- ] )
163- . build ( ) ;
164-
165- // Create telemetry with resource upfront, before any metrics calls
166- let telemetry = super :: create_telemetry_with_resource ( & args. test_args . common , load_resource) ;
167-
120+ scenario. load_query_set ( ) ?;
168121 // Create the appropriate query executor based on args
169- let executor = super :: create_query_executor ( & args . test_args , None , adbc_conn) . await ? ;
122+ let executor = Box :: new ( adbc_executor :: AdbcDirectQueryExecutor :: new ( adbc_conn) ) ;
170123
171- println ! ( "Starting Spicebench run " ) ;
124+ println ! ( "Running benchmark " ) ;
172125
173- let load_end_condition = if args. run_until_stopped {
174- EndCondition :: Unlimited
175- } else {
176- EndCondition :: Duration ( Duration :: from_secs ( args. test_args . common . duration ) )
177- } ;
126+ let load_end_condition = scenario. end_condition ( ) ;
178127
179128 // Create streaming OTLP exporter if OTLP endpoint is configured
180- let streaming_exporter = args
181- . test_args
182- . common
129+ let streaming_exporter = common_args
183130 . otlp_endpoint
184131 . as_ref ( )
185132 . map ( |endpoint| StreamingOtlpExporter :: spawn ( endpoint. clone ( ) ) ) ;
186133
187134 // Spawn SUT metrics scraper if --scrape-sut-metrics is enabled and a system adapter is configured
188135 let sut_scraper_token = CancellationToken :: new ( ) ;
189- let sut_scraper_handle = if args . test_args . common . scrape_sut_metrics
190- && ( args . test_args . common . system_adapter_stdio_cmd . is_some ( )
191- || args . test_args . common . system_adapter_http_url . is_some ( ) )
136+ let sut_scraper_handle = if common_args . scrape_sut_metrics
137+ && ( common_args . system_adapter_stdio_cmd . is_some ( )
138+ || common_args . system_adapter_http_url . is_some ( ) )
192139 {
193- let adapter = super :: connect_system_adapter ( & args . test_args . common ) . await ?;
140+ let adapter = super :: connect_system_adapter ( common_args ) . await ?;
194141 let run_id = uuid:: Uuid :: new_v4 ( ) ;
195142 println ! ( "SUT metrics scraping enabled (run_id={run_id})" ) ;
196143 Some ( spawn_sut_metrics_scraper (
@@ -204,43 +151,33 @@ pub(crate) async fn run(
204151 } ;
205152
206153 // Record client concurrency as a gauge
207- crate :: metrics:: ACTIVE_CONNECTIONS . record (
208- args. test_args . common . concurrency . try_into ( ) . unwrap_or ( 0 ) ,
209- & [ ] ,
210- ) ;
154+ crate :: metrics:: ACTIVE_CONNECTIONS . record ( common_args. concurrency . try_into ( ) . unwrap_or ( 0 ) , & [ ] ) ;
211155
212156 let mut test_builder = NotStarted :: new ( )
213- . with_parallel_count ( args . test_args . common . concurrency )
157+ . with_parallel_count ( common_args . concurrency )
214158 . with_end_condition ( load_end_condition)
215- . with_query_executor ( executor)
216- . with_query_duration_threshold ( args. test_args . mark_query_failed_if_exceeds ) ;
159+ . with_query_executor ( executor) ;
217160
218161 // Add streaming metrics sender if exporter is configured
219162 if let Some ( exporter) = & streaming_exporter {
220163 test_builder = test_builder. with_streaming_metrics ( exporter. sender ( ) ) ;
221164 }
222165
223166 let ( query_set, test_builder) =
224- super :: build_test_with_validation ( & args . test_args , test_builder) . await ?;
167+ super :: build_test_with_validation ( scenario , test_builder) . await ?;
225168
226- // Use the same query overrides that were applied in build_test_with_validation
227- let query_overrides = args
228- . test_args
229- . query_overrides
230- . clone ( )
231- . map ( test_framework:: queries:: QueryOverrides :: from) ;
232- let _queries = query_set. get_queries ( query_overrides, None , None ) . await ?;
169+ let _queries = query_set. get_queries ( None , None , None ) . await ?;
233170
234- let throughput_test = SpiceTest :: < NotStarted > :: new ( "spicebench" . into ( ) , test_builder)
235- . with_progress_bars ( !args . test_args . common . disable_progress_bars )
171+ let throughput_test = SpiceTest :: < NotStarted > :: new ( scenario . to_string ( ) , test_builder)
172+ . with_progress_bars ( false )
236173 . start ( ) ?;
237174 let shutdown_token = throughput_test. cancellation_token ( ) ;
238175 let test_future = throughput_test. wait ( ) ;
239176 tokio:: pin!( test_future) ;
240177 let test = match tokio:: select! {
241178 res = & mut test_future => res,
242179 _ = signal:: ctrl_c( ) => {
243- println!( "Interrupt received, stopping load test ..." ) ;
180+ println!( "Interrupt received, stopping benchmark ..." ) ;
244181 shutdown_token. cancel( ) ;
245182 test_future. await
246183 }
@@ -250,11 +187,14 @@ pub(crate) async fn run(
250187 return Err ( e) ;
251188 }
252189 } ;
190+ test. get_query_durations ( ) . statistical_set ( ) ?;
191+
253192 // Get all query durations for overall statistics before ending the test
254193 let all_durations = test. get_query_durations ( ) . clone ( ) ;
255194 let all_duration_values: Vec < _ > = all_durations. values ( ) . flatten ( ) . copied ( ) . collect ( ) ;
256195
257196 let metrics: QueryMetrics < _ , NoExtendedMetrics > = test. collect ( TestType :: Load ) ?;
197+ let _ = test. end ( ) ;
258198
259199 // Record per-query metrics for load test
260200 for query in & metrics. metrics {
@@ -283,13 +223,8 @@ pub(crate) async fn run(
283223 . record ( ( metrics. finished_at - metrics. started_at ) . try_into ( ) ?, & [ ] ) ;
284224
285225 // Query throughput metrics
286- let total_iterations: u64 = metrics
287- . metrics
288- . iter ( )
289- . map ( |q| q. iterations as u64 )
290- . sum ( ) ;
291- let test_duration_secs =
292- ( metrics. finished_at - metrics. started_at ) as f64 / 1000.0 ;
226+ let total_iterations: u64 = metrics. metrics . iter ( ) . map ( |q| q. iterations as u64 ) . sum ( ) ;
227+ let test_duration_secs = ( metrics. finished_at - metrics. started_at ) as f64 / 1000.0 ;
293228 crate :: metrics:: QUERIES_TOTAL . add ( total_iterations, & [ ] ) ;
294229 if test_duration_secs > 0.0 {
295230 let qps = total_iterations as f64 / test_duration_secs;
@@ -300,8 +235,7 @@ pub(crate) async fn run(
300235 . map ( |n| n. get ( ) as f64 )
301236 . unwrap_or ( 1.0 ) ;
302237 if cpu_cores > 0.0 {
303- crate :: metrics:: EFFICIENCY_QUERIES_PER_CORE
304- . record ( qps / cpu_cores, & [ ] ) ;
238+ crate :: metrics:: EFFICIENCY_QUERIES_PER_CORE . record ( qps / cpu_cores, & [ ] ) ;
305239 }
306240 }
307241
@@ -319,7 +253,8 @@ pub(crate) async fn run(
319253 ) ;
320254 }
321255
322- println ! ( "Load test metrics:" ) ;
256+ println ! ( "{}" , vec![ "-" ; 30 ] . join( "" ) ) ;
257+ println ! ( "Benchmark metrics:" ) ;
323258 let records = metrics. build_records ( ) ?;
324259 print_batches ( & records) ?;
325260
@@ -328,9 +263,6 @@ pub(crate) async fn run(
328263 exporter. shutdown ( ) . await ;
329264 }
330265
331- telemetry. emit ( ) . await ?;
332-
333- println ! ( "Spicebench run completed" ) ;
334-
266+ println ! ( "Benchmark completed" ) ;
335267 Ok ( ( ) )
336268}
0 commit comments