@@ -15,7 +15,7 @@ limitations under the License.
1515*/
1616
1717use std:: sync:: Arc ;
18- use std:: time:: { SystemTime , UNIX_EPOCH } ;
18+ use std:: time:: { Duration , SystemTime , UNIX_EPOCH } ;
1919
2020use arrow:: array:: { Array , RecordBatch , StringArray , TimestampMicrosecondArray } ;
2121use arrow:: compute;
@@ -34,7 +34,7 @@ use system_adapter_protocol::DatasetConfig as ProtocolDatasetConfig;
3434use tokio:: sync:: watch;
3535use tokio:: task:: { JoinHandle , JoinSet } ;
3636use tokio_util:: sync:: CancellationToken ;
37- use tracing:: { debug, error, info, warn} ;
37+ use tracing:: { debug, error, info, trace , warn} ;
3838
3939use crate :: sink:: { InsertOp , Sink } ;
4040
@@ -55,7 +55,7 @@ const TARGET_BATCH_ROWS: usize = 8_192 * 4;
5555
5656/// Maximum number of in-flight sink writes allowed per table task when the
5757/// current segment set is insert-only.
58- const MAX_IN_FLIGHT_TABLE_WRITES : usize = 2 ;
58+ const MAX_IN_FLIGHT_TABLE_WRITES : usize = 1 ;
5959
6060/// Maximum number of concurrent source logical-batch reads per ETL table task.
6161const MAX_IN_FLIGHT_SOURCE_BATCH_READS : usize = 2 ;
@@ -113,6 +113,33 @@ fn build_partition_columns(dataset_columns: Vec<String>) -> Vec<String> {
113113 columns
114114}
115115
116+ fn record_timing_sample_ms ( samples : & StdArc < StdMutex < Vec < u64 > > > , elapsed : Duration ) {
117+ let elapsed_ms = elapsed. as_millis ( ) . min ( u128:: from ( u64:: MAX ) ) as u64 ;
118+ let mut values = samples. lock ( ) . expect ( "timing samples lock poisoned" ) ;
119+ values. push ( elapsed_ms) ;
120+ }
121+
122+ fn take_median_sample_ms ( samples : & StdArc < StdMutex < Vec < u64 > > > ) -> Option < ( f64 , usize ) > {
123+ let mut values = {
124+ let mut guard = samples. lock ( ) . expect ( "timing samples lock poisoned" ) ;
125+ if guard. is_empty ( ) {
126+ return None ;
127+ }
128+ std:: mem:: take ( & mut * guard)
129+ } ;
130+
131+ values. sort_unstable ( ) ;
132+ let count = values. len ( ) ;
133+ let mid = count / 2 ;
134+ let median_ms = if count % 2 == 1 {
135+ values[ mid] as f64
136+ } else {
137+ ( values[ mid - 1 ] as f64 + values[ mid] as f64 ) / 2.0
138+ } ;
139+
140+ Some ( ( median_ms, count) )
141+ }
142+
116143/// Concatenates small input batches and splits large input batches so each
117144/// resulting batch has at most [`TARGET_BATCH_ROWS`] rows.
118145///
@@ -1367,11 +1394,13 @@ async fn run_pipeline(
13671394 "ETL pipeline run started"
13681395 ) ;
13691396
1370- // Shared progress counters for periodic logging.
1397+ // Shared progress counters and timing samples for periodic logging.
13711398 let logical_steps_consumed = StdArc :: new ( AtomicU64 :: new ( 0 ) ) ;
13721399 let batches_processed = StdArc :: new ( AtomicU64 :: new ( 0 ) ) ;
13731400 let rows_processed = StdArc :: new ( AtomicU64 :: new ( 0 ) ) ;
13741401 let tables_finished_counter = StdArc :: new ( AtomicU64 :: new ( 0 ) ) ;
1402+ let batch_retrieval_samples_ms = StdArc :: new ( StdMutex :: new ( Vec :: < u64 > :: new ( ) ) ) ;
1403+ let sink_write_samples_ms = StdArc :: new ( StdMutex :: new ( Vec :: < u64 > :: new ( ) ) ) ;
13751404 let pipeline_start = Instant :: now ( ) ;
13761405
13771406 // Spawn periodic progress logger (every 5 seconds).
@@ -1380,6 +1409,8 @@ async fn run_pipeline(
13801409 let batches_processed = StdArc :: clone ( & batches_processed) ;
13811410 let rows_processed = StdArc :: clone ( & rows_processed) ;
13821411 let tables_finished_counter = StdArc :: clone ( & tables_finished_counter) ;
1412+ let batch_retrieval_samples_ms = StdArc :: clone ( & batch_retrieval_samples_ms) ;
1413+ let sink_write_samples_ms = StdArc :: clone ( & sink_write_samples_ms) ;
13831414 let cancel = cancel. clone ( ) ;
13841415 tokio:: spawn ( async move {
13851416 let mut interval = tokio:: time:: interval ( std:: time:: Duration :: from_secs ( 5 ) ) ;
@@ -1405,6 +1436,23 @@ async fn run_pipeline(
14051436 rows_per_sec = format!( "{:.1}" , rows_done as f64 / secs) ,
14061437 "ETL progress"
14071438 ) ;
1439+
1440+ let retrieval_summary = take_median_sample_ms( & batch_retrieval_samples_ms) ;
1441+ let sink_write_summary = take_median_sample_ms( & sink_write_samples_ms) ;
1442+ if retrieval_summary. is_some( ) || sink_write_summary. is_some( ) {
1443+ let ( retrieval_median_ms, retrieval_samples) =
1444+ retrieval_summary. unwrap_or( ( 0.0 , 0 ) ) ;
1445+ let ( sink_write_median_ms, sink_write_samples) =
1446+ sink_write_summary. unwrap_or( ( 0.0 , 0 ) ) ;
1447+
1448+ debug!(
1449+ retrieval_samples,
1450+ retrieval_median_ms = format!( "{retrieval_median_ms:.1}" ) ,
1451+ sink_write_samples,
1452+ sink_write_median_ms = format!( "{sink_write_median_ms:.1}" ) ,
1453+ "ETL batch timing medians (last 5s)"
1454+ ) ;
1455+ }
14081456 }
14091457 ( ) = cancel. cancelled( ) => break ,
14101458 }
@@ -1490,12 +1538,16 @@ async fn run_pipeline(
14901538 let work_state = Arc :: clone ( & work_state) ;
14911539 let logical_steps_consumed = StdArc :: clone ( & logical_steps_consumed) ;
14921540 let last_created_at = Arc :: clone ( & last_created_at_us) ;
1541+ let batch_retrieval_samples_ms = StdArc :: clone ( & batch_retrieval_samples_ms) ;
1542+ let sink_write_samples_ms = StdArc :: clone ( & sink_write_samples_ms) ;
14931543 let partition_columns = table_partition_columns
14941544 . get ( & table_name)
14951545 . cloned ( )
14961546 . unwrap_or_else ( || vec ! [ CREATED_AT_COLUMN . to_string( ) ] ) ;
14971547
14981548 join_set. spawn ( async move {
1549+ let retrieval_started_at = Instant :: now ( ) ;
1550+
14991551 // 1. Read from source; keep reading subsequent table batches
15001552 // until we accumulate enough rows for efficient downstream work.
15011553 let ( source_batches, key_columns, table_finished, consumed_work_units, rows_read) =
@@ -1545,6 +1597,12 @@ async fn run_pipeline(
15451597 ) ) ;
15461598 }
15471599 } ;
1600+
1601+ record_timing_sample_ms (
1602+ & batch_retrieval_samples_ms,
1603+ retrieval_started_at. elapsed ( ) ,
1604+ ) ;
1605+
15481606 for batch in & coalesced {
15491607 let segments = match split_batch_by_op ( batch, & key_columns) {
15501608 Ok ( s) => s,
@@ -1561,6 +1619,8 @@ async fn run_pipeline(
15611619 }
15621620 } ;
15631621
1622+ let write_started_at = Instant :: now ( ) ;
1623+
15641624 if let Err ( err_msg) = write_segments_for_batch (
15651625 Arc :: clone ( & data_sink) ,
15661626 & table_name,
@@ -1579,13 +1639,16 @@ async fn run_pipeline(
15791639 ) ;
15801640 return Err ( err_msg) ;
15811641 }
1642+
1643+ record_timing_sample_ms ( & sink_write_samples_ms, write_started_at. elapsed ( ) ) ;
1644+
15821645 let tracker = last_created_at
15831646 . get ( & table_name)
15841647 . expect ( "table missing from last_created_at map" ) ;
15851648 tracker. store ( batch_ts, Ordering :: Relaxed ) ;
15861649 }
15871650
1588- debug ! (
1651+ trace ! (
15891652 table = %table_name,
15901653 batch_id,
15911654 "Table batch processed"
@@ -1626,7 +1689,7 @@ async fn run_pipeline(
16261689 batches_processed. fetch_add ( step_batch_count, Ordering :: Relaxed ) ;
16271690 rows_processed. fetch_add ( step_rows_count, Ordering :: Relaxed ) ;
16281691
1629- debug ! (
1692+ trace ! (
16301693 batch_id,
16311694 outer_steps_processed,
16321695 steps_processed = logical_steps_consumed. load( Ordering :: Relaxed ) ,
0 commit comments