@@ -16,7 +16,9 @@ limitations under the License.
1616#![ allow( dead_code) ]
1717
1818use crate :: { args:: CommonArgs , commands:: adbc_executor, scenario:: Scenario } ;
19+ use arrow:: array:: { Array , TimestampMicrosecondArray } ;
1920use etl:: { ETLPipeline , PipelineState , StopReason } ;
21+ use std:: collections:: HashMap ;
2022use std:: sync:: Arc ;
2123use std:: time:: Duration ;
2224use system_adapter_protocol:: MetricsResponse ;
@@ -118,6 +120,81 @@ fn spawn_sut_metrics_scraper(
118120 } )
119121}
120122
123+ /// Spawn a task that periodically queries `SELECT MAX(__created_at)` for each
124+ /// table and records the freshness delay (`now − max_created_at`).
125+ ///
126+ /// Returns a map of table name → vec of freshness samples (in milliseconds).
127+ fn spawn_e2e_latency_check (
128+ conn : Arc < std:: sync:: Mutex < adbc_client:: AdbcConnection > > ,
129+ table_names : Vec < String > ,
130+ token : CancellationToken ,
131+ interval : Duration ,
132+ ) -> tokio:: task:: JoinHandle < HashMap < String , Vec < f64 > > > {
133+ tokio:: spawn ( async move {
134+ let mut samples_by_table: HashMap < String , Vec < f64 > > = table_names
135+ . iter ( )
136+ . map ( |t| ( t. clone ( ) , Vec :: new ( ) ) )
137+ . collect ( ) ;
138+ let mut ticker = tokio:: time:: interval ( interval) ;
139+ loop {
140+ tokio:: select! {
141+ _ = ticker. tick( ) => { }
142+ ( ) = token. cancelled( ) => break ,
143+ }
144+
145+ let conn = Arc :: clone ( & conn) ;
146+ let tables = table_names. clone ( ) ;
147+ let results = tokio:: task:: spawn_blocking ( move || {
148+ let now_us = std:: time:: SystemTime :: now ( )
149+ . duration_since ( std:: time:: UNIX_EPOCH )
150+ . unwrap_or_default ( )
151+ . as_micros ( ) as i64 ;
152+ let mut out: Vec < ( String , Option < f64 > ) > = Vec :: new ( ) ;
153+ let mut guard = match conn. lock ( ) {
154+ Ok ( g) => g,
155+ Err ( e) => {
156+ eprintln ! ( "E2E latency scraper: lock poisoned: {e}" ) ;
157+ return out;
158+ }
159+ } ;
160+ for table in & tables {
161+ let sql = format ! ( "SELECT MAX(__created_at) FROM {table}" ) ;
162+ match guard. query ( & sql) {
163+ Ok ( batches) => {
164+ let sample = batches. first ( ) . and_then ( |batch| {
165+ let col = batch. column ( 0 ) ;
166+ let ts_array =
167+ col. as_any ( ) . downcast_ref :: < TimestampMicrosecondArray > ( ) ?;
168+ if ts_array. is_null ( 0 ) {
169+ return None ;
170+ }
171+ let max_ts_us = ts_array. value ( 0 ) ;
172+ Some ( ( now_us - max_ts_us) as f64 / 1000.0 )
173+ } ) ;
174+ out. push ( ( table. clone ( ) , sample) ) ;
175+ }
176+ Err ( e) => {
177+ eprintln ! ( "E2E latency checker: query failed for {table}: {e}" ) ;
178+ out. push ( ( table. clone ( ) , None ) ) ;
179+ }
180+ }
181+ }
182+ out
183+ } )
184+ . await ;
185+
186+ if let Ok ( results) = results {
187+ for ( table, sample) in results {
188+ if let Some ( ms) = sample {
189+ samples_by_table. entry ( table) . or_default ( ) . push ( ms) ;
190+ }
191+ }
192+ }
193+ }
194+ samples_by_table
195+ } )
196+ }
197+
121198#[ expect( clippy:: too_many_lines) ]
122199pub ( crate ) async fn run (
123200 scenario : & Scenario ,
@@ -141,8 +218,12 @@ pub(crate) async fn run(
141218 // Create telemetry with resource upfront, before any metrics calls
142219 let telemetry = super :: create_telemetry_with_resource ( common_args, load_resource) ;
143220
144- // Create the appropriate query executor based on args
145- let executor = Box :: new ( adbc_executor:: AdbcDirectQueryExecutor :: new ( adbc_conn) ) ;
221+ // Create the appropriate query executor based on args, sharing the ADBC connection
222+ // so the freshness scraper can also query through it.
223+ let shared_conn = Arc :: new ( std:: sync:: Mutex :: new ( adbc_conn) ) ;
224+ let executor = Box :: new ( adbc_executor:: AdbcDirectQueryExecutor :: from_shared (
225+ Arc :: clone ( & shared_conn) ,
226+ ) ) ;
146227
147228 println ! ( "Running benchmark" ) ;
148229
@@ -174,6 +255,16 @@ pub(crate) async fn run(
174255 None
175256 } ;
176257
258+ // Spawn e2e checker
259+ let table_names: Vec < String > = etl_pipeline. dataset ( ) . tables ( ) . keys ( ) . cloned ( ) . collect ( ) ;
260+ let e2e_latency_token = CancellationToken :: new ( ) ;
261+ let e2e_latency_handle = spawn_e2e_latency_check (
262+ Arc :: clone ( & shared_conn) ,
263+ table_names,
264+ e2e_latency_token. clone ( ) ,
265+ Duration :: from_secs ( 5 ) ,
266+ ) ;
267+
177268 // Record client concurrency as a gauge
178269 crate :: metrics:: ACTIVE_CONNECTIONS . record (
179270 common_args. concurrency . try_into ( ) . unwrap_or ( 0 ) ,
@@ -325,6 +416,33 @@ pub(crate) async fn run(
325416 ) ;
326417 }
327418
419+ // Stop freshness scraper and emit P99 metrics
420+ e2e_latency_token. cancel ( ) ;
421+ if let Ok ( samples_by_table) = e2e_latency_handle. await {
422+ let mut all_samples: Vec < f64 > = Vec :: new ( ) ;
423+ for ( table_name, samples) in & samples_by_table {
424+ if !samples. is_empty ( ) {
425+ all_samples. extend ( samples) ;
426+ let mut sorted = samples. clone ( ) ;
427+ sorted. sort_by ( |a, b| a. partial_cmp ( b) . unwrap ( ) ) ;
428+ let idx = ( ( sorted. len ( ) as f64 * 0.99 ) as usize ) . min ( sorted. len ( ) - 1 ) ;
429+ let p99 = sorted[ idx] ;
430+ let attrs = vec ! [ KeyValue :: new( "table_name" , table_name. clone( ) ) ] ;
431+ crate :: metrics:: E2E_LATENCY_P99_MS . record ( p99, & attrs) ;
432+ }
433+ }
434+ if !all_samples. is_empty ( ) {
435+ all_samples. sort_by ( |a, b| a. partial_cmp ( b) . unwrap ( ) ) ;
436+ let idx = ( ( all_samples. len ( ) as f64 * 0.99 ) as usize ) . min ( all_samples. len ( ) - 1 ) ;
437+ let p99 = all_samples[ idx] ;
438+ crate :: metrics:: E2E_LATENCY_P99_MS . record ( p99, & [ KeyValue :: new ( "table_name" , "" ) ] ) ;
439+ println ! (
440+ "Data freshness P99: {p99:.1}ms ({} samples)" ,
441+ all_samples. len( )
442+ ) ;
443+ }
444+ }
445+
328446 println ! ( "{}" , vec![ "-" ; 30 ] . join( "" ) ) ;
329447 println ! ( "Benchmark metrics:" ) ;
330448 let records = metrics. build_records ( ) ?;
0 commit comments