11// This commit pipeline is inspired by Pebble's commit pipeline.
22
3- use std:: sync:: {
4- atomic:: { AtomicBool , AtomicPtr , AtomicU64 , Ordering } ,
5- Arc , Mutex ,
6- } ;
3+ use std:: sync:: atomic:: { AtomicBool , AtomicPtr , AtomicU64 , Ordering } ;
4+ use std:: sync:: { Arc , Mutex } ;
75
86use tokio:: sync:: { oneshot, Semaphore } ;
97
10- use crate :: {
11- batch:: Batch ,
12- error:: { Error , Result } ,
13- } ;
8+ use crate :: batch:: Batch ;
9+ use crate :: error:: { Error , Result } ;
1410
1511const MAX_CONCURRENT_COMMITS : usize = 16 ;
1612const DEQUEUE_BITS : u32 = 32 ;
@@ -132,7 +128,8 @@ impl CommitQueue {
132128 self . head_tail . fetch_add ( 1 << DEQUEUE_BITS , Ordering :: Release ) ;
133129 }
134130
135- // Multi-consumer dequeue - removes the earliest enqueued Batch, if it is applied
131+ // Multi-consumer dequeue - removes the earliest enqueued Batch, if it is
132+ // applied
136133 fn dequeue_applied ( & self ) -> Option < Arc < CommitBatch > > {
137134 loop {
138135 let ptrs = self . head_tail . load ( Ordering :: Acquire ) ;
@@ -210,8 +207,8 @@ impl CommitPipeline {
210207 }
211208
212209 /// Synchronous commit that bypasses the async pipeline machinery.
213- /// This is suitable for cases where no flow control or async coordination is needed,
214- /// such as delete list updates during compaction.
210+ /// This is suitable for cases where no flow control or async coordination
211+ /// is needed, such as delete list updates during compaction.
215212 pub ( crate ) fn sync_commit ( & self , batch : Batch , sync_wal : bool ) -> Result < ( ) > {
216213 if self . shutdown . load ( Ordering :: Acquire ) {
217214 return Err ( Error :: PipelineStall ) ;
@@ -372,7 +369,7 @@ mod tests {
372369 use std:: time:: Duration ;
373370
374371 use crate :: sstable:: InternalKeyKind ;
375- use crate :: spawn :: spawn;
372+ use crate :: util :: spawn;
376373
377374 use super :: * ;
378375
@@ -435,11 +432,14 @@ mod tests {
435432 #[ tokio:: test( flavor = "multi_thread" , worker_threads = 4 ) ]
436433 async fn test_concurrent_commits ( ) {
437434 let pipeline = CommitPipeline :: new ( Arc :: new ( MockEnv ) ) ;
435+ let num_tasks = 10 ;
436+ let ( result_sender, result_receiver) = async_channel:: bounded ( num_tasks) ;
438437
439- let mut handles = vec ! [ ] ;
440- for i in 0 ..10 {
438+ // Spawn concurrent commit tasks
439+ for i in 0 ..num_tasks {
441440 let pipeline = pipeline. clone ( ) ;
442- let handle = spawn ( async move {
441+ let sender = result_sender. clone ( ) ;
442+ spawn ( async move {
443443 let mut batch = Batch :: new ( ) ;
444444 batch
445445 . add_record (
@@ -448,13 +448,20 @@ mod tests {
448448 Some ( & [ 1 , 2 , 3 ] ) ,
449449 )
450450 . unwrap ( ) ;
451- pipeline. commit ( batch, false ) . await
451+ let result = pipeline. commit ( batch, false ) . await ;
452+ let _ = sender. send ( ( i, result) ) . await ;
452453 } ) ;
453- handles. push ( handle) ;
454454 }
455455
456- for ( i, handle) in handles. into_iter ( ) . enumerate ( ) {
457- let result = handle. await . unwrap ( ) ;
456+ // Collect all results
457+ let mut results = Vec :: new ( ) ;
458+ for _ in 0 ..num_tasks {
459+ let ( i, result) = result_receiver. recv ( ) . await . unwrap ( ) ;
460+ results. push ( ( i, result) ) ;
461+ }
462+
463+ // Verify all commits succeeded
464+ for ( i, result) in results {
458465 assert ! ( result. is_ok( ) , "Commit {i} failed: {result:?}" ) ;
459466 }
460467
@@ -494,11 +501,14 @@ mod tests {
494501 #[ tokio:: test( flavor = "multi_thread" ) ]
495502 async fn test_concurrent_commits_with_delays ( ) {
496503 let pipeline = CommitPipeline :: new ( Arc :: new ( DelayedMockEnv ) ) ;
504+ let num_tasks = 5 ;
505+ let ( result_sender, result_receiver) = async_channel:: bounded ( 5 ) ;
497506
498- let mut handles = vec ! [ ] ;
499- for i in 0 ..5 {
507+ // Spawn concurrent commit tasks
508+ for i in 0 ..num_tasks {
500509 let pipeline = pipeline. clone ( ) ;
501- let handle = spawn ( async move {
510+ let sender = result_sender. clone ( ) ;
511+ spawn ( async move {
502512 let mut batch = Batch :: new ( ) ;
503513 batch
504514 . add_record (
@@ -507,17 +517,19 @@ mod tests {
507517 Some ( & [ 1 , 2 , 3 ] ) ,
508518 )
509519 . unwrap ( ) ;
510- pipeline. commit ( batch, false ) . await
520+ let result = pipeline. commit ( batch, false ) . await ;
521+ let _ = sender. send ( result) . await ;
511522 } ) ;
512- handles. push ( handle) ;
513523 }
514524
515- for handle in handles {
516- assert ! ( handle. await . unwrap( ) . is_ok( ) ) ;
525+ // Collect all results
526+ for _ in 0 ..num_tasks {
527+ let res = result_receiver. recv ( ) . await . unwrap ( ) ;
528+ assert ! ( res. is_ok( ) ) ;
517529 }
518530
519531 // Verify sequence numbers are published correctly
520- assert_eq ! ( pipeline. get_visible_seq_num( ) , 5 ) ;
532+ assert_eq ! ( pipeline. get_visible_seq_num( ) , num_tasks ) ;
521533
522534 // Shutdown the pipeline
523535 pipeline. shutdown ( ) ;
0 commit comments