@@ -15,6 +15,7 @@ limitations under the License.
1515*/
1616
1717use std:: collections:: HashMap ;
18+ use std:: collections:: VecDeque ;
1819use std:: sync:: atomic:: { AtomicI64 , AtomicU16 , Ordering } ;
1920use std:: sync:: { Arc , Mutex } ;
2021
@@ -72,6 +73,34 @@ const SF1_ROW_COUNTS: &[(&str, u64)] = &[
7273 ( "lineitem" , 6_001_215 ) ,
7374] ;
7475
76+ const MIN_TPCH_ROWS_PER_FILE : usize = 32_000 ;
77+ const MAX_TPCH_ROWS_PER_FILE : usize = 64_000 ;
78+ const DEFAULT_TPCH_MAX_ROWS_PER_FILE : usize = 48_000 ;
79+
80+ fn tpch_max_rows_per_file ( ) -> usize {
81+ std:: env:: var ( "SPICEBENCH_TPCH_MAX_ROWS_PER_FILE" )
82+ . ok ( )
83+ . and_then ( |v| v. parse :: < usize > ( ) . ok ( ) )
84+ . filter ( |v| * v > 0 )
85+ . map ( |v| v. clamp ( MIN_TPCH_ROWS_PER_FILE , MAX_TPCH_ROWS_PER_FILE ) )
86+ . unwrap_or ( DEFAULT_TPCH_MAX_ROWS_PER_FILE )
87+ }
88+
89+ fn split_record_batch ( batch : RecordBatch , max_rows : usize ) -> VecDeque < RecordBatch > {
90+ if batch. num_rows ( ) <= max_rows {
91+ return VecDeque :: from ( [ batch] ) ;
92+ }
93+
94+ let mut out = VecDeque :: new ( ) ;
95+ let mut offset = 0usize ;
96+ while offset < batch. num_rows ( ) {
97+ let len = std:: cmp:: min ( max_rows, batch. num_rows ( ) - offset) ;
98+ out. push_back ( batch. slice ( offset, len) ) ;
99+ offset += len;
100+ }
101+ out
102+ }
103+
75104/// Returns the expected total number of rows for a given table at the
76105/// specified scale factor.
77106fn total_rows_for_table ( table : & str , scale_factor : f64 ) -> u64 {
@@ -364,12 +393,16 @@ pub struct TpchDataset {
364393 mutations : MutationConfig ,
365394 /// Per-table step counter tracking which part to generate next (0-indexed).
366395 table_steps : HashMap < String , AtomicU16 > ,
396+ /// Per-table queue of already-generated chunks waiting to be emitted.
397+ pending_batches : HashMap < String , Mutex < VecDeque < RecordBatch > > > ,
367398 /// Per-table primary key tracking for update/delete targeting.
368399 key_sets : HashMap < String , Mutex < IndexedKeySet < PrimaryKeyValue > > > ,
369400 /// Global monotonically increasing operation counter for replay ordering.
370401 op_counter : AtomicI64 ,
371402 /// The storage backend for reading/writing table metadata.
372403 storage : Arc < dyn DataStorage > ,
404+ /// Maximum number of rows per emitted batch/file.
405+ max_rows_per_file : usize ,
373406}
374407
375408impl TpchDataset {
@@ -394,14 +427,25 @@ impl TpchDataset {
394427 . map ( |( name, _) | ( name. to_string ( ) , AtomicU16 :: new ( 0 ) ) )
395428 . collect ( ) ;
396429
430+ let pending_batches: HashMap < String , Mutex < VecDeque < RecordBatch > > > = TPCH_TABLES
431+ . iter ( )
432+ . map ( |( name, _) | ( name. to_string ( ) , Mutex :: new ( VecDeque :: new ( ) ) ) )
433+ . collect ( ) ;
434+
435+ let max_rows_per_file = tpch_max_rows_per_file ( ) ;
436+
437+ info ! ( max_rows_per_file, "Configured TPCH maximum rows per file" ) ;
438+
397439 Ok ( Self {
398440 scale_factor : config. scale_factor ,
399441 num_steps : config. num_steps ,
400442 mutations : mutations. clone ( ) ,
401443 table_steps,
444+ pending_batches,
402445 key_sets,
403446 op_counter : AtomicI64 :: new ( 0 ) ,
404447 storage,
448+ max_rows_per_file,
405449 } )
406450 }
407451}
@@ -463,6 +507,15 @@ impl Dataset for TpchDataset {
463507 }
464508
465509 async fn raw_next_batch ( & self , table : & str ) -> anyhow:: Result < Option < RecordBatch > > {
510+ if let Some ( queued) = self . pending_batches . get ( table) {
511+ let mut queued = queued
512+ . lock ( )
513+ . map_err ( |e| anyhow:: anyhow!( "lock poisoned: {e}" ) ) ?;
514+ if let Some ( batch) = queued. pop_front ( ) {
515+ return Ok ( Some ( batch) ) ;
516+ }
517+ }
518+
466519 // Each table independently tracks which step (part) it is on.
467520 let step_counter = self
468521 . table_steps
@@ -602,7 +655,23 @@ impl Dataset for TpchDataset {
602655 let op_indices: Vec < i64 > = ( op_base..op_base + total_rows as i64 ) . collect ( ) ;
603656 columns. push ( Arc :: new ( Int64Array :: from ( op_indices) ) ) ;
604657
605- Ok ( Some ( RecordBatch :: try_new ( schema, columns) ?) )
658+ let combined_batch = RecordBatch :: try_new ( schema, columns) ?;
659+ let mut chunks = split_record_batch ( combined_batch, self . max_rows_per_file ) ;
660+
661+ let first = chunks
662+ . pop_front ( )
663+ . ok_or_else ( || anyhow:: anyhow!( "internal error: no chunks produced" ) ) ?;
664+
665+ if !chunks. is_empty ( )
666+ && let Some ( queued) = self . pending_batches . get ( table)
667+ {
668+ let mut queued = queued
669+ . lock ( )
670+ . map_err ( |e| anyhow:: anyhow!( "lock poisoned: {e}" ) ) ?;
671+ queued. extend ( chunks) ;
672+ }
673+
674+ Ok ( Some ( first) )
606675 }
607676
608677 fn tables ( & self ) -> HashMap < String , DatasetTable > {
@@ -716,7 +785,7 @@ mod tests {
716785 }
717786
718787 #[ tokio:: test]
719- async fn tpch_num_batches_matches_emitted_batches_per_table ( ) {
788+ async fn tpch_num_batches_is_a_lower_bound_for_emitted_batches_per_table ( ) {
720789 let dataset = build_dataset ( 1.0 , 7 ) ;
721790
722791 for ( table, _) in TPCH_TABLES {
@@ -731,14 +800,35 @@ mod tests {
731800 emitted_batches += 1 ;
732801 }
733802
734- assert_eq ! (
735- emitted_batches,
736- dataset. num_batches( table) ,
737- "unexpected batch count for table '{table}'"
803+ assert ! (
804+ emitted_batches >= dataset. num_batches( table) ,
805+ "emitted batches should be >= planned batches for table '{table}'"
738806 ) ;
739807 }
740808 }
741809
810+ #[ tokio:: test]
811+ async fn tpch_batches_are_capped_to_max_rows_per_file ( ) {
812+ let dataset = build_dataset ( 1.0 , 7 ) ;
813+
814+ let mut saw_split = false ;
815+ while let Some ( batch) = dataset
816+ . raw_next_batch ( "lineitem" )
817+ . await
818+ . expect ( "raw_next_batch should not fail" )
819+ {
820+ assert ! (
821+ batch. num_rows( ) <= DEFAULT_TPCH_MAX_ROWS_PER_FILE ,
822+ "lineitem chunk exceeded max rows per file"
823+ ) ;
824+ if batch. num_rows ( ) == DEFAULT_TPCH_MAX_ROWS_PER_FILE {
825+ saw_split = true ;
826+ }
827+ }
828+
829+ assert ! ( saw_split, "expected at least one full-size split chunk" ) ;
830+ }
831+
742832 #[ tokio:: test]
743833 async fn tpch_zero_mutation_batches_have_create_only_ops ( ) {
744834 let dataset = build_dataset ( 1.0 , 10 ) ;
0 commit comments