@@ -50,6 +50,10 @@ impl CellStorage {
50
50
StoreContext :: new ( & self . db , & self . raw_cells_cache , capacity)
51
51
}
52
52
53
+ pub fn create_remove_ctx ( & self ) -> RemoveContext {
54
+ RemoveContext :: new ( & self . db , & self . raw_cells_cache )
55
+ }
56
+
53
57
pub fn apply_temp_cell ( & self , root : & HashBytes ) -> Result < ( ) > {
54
58
const MAX_NEW_CELLS_BATCH_SIZE : usize = 10000 ;
55
59
@@ -554,6 +558,148 @@ impl StoreContext {
554
558
}
555
559
}
556
560
561
+ #[ derive( Clone ) ]
562
+ struct RemovedCell {
563
+ old_rc : i64 ,
564
+ removes : u32 ,
565
+ refs : Vec < HashBytes > ,
566
+ }
567
+
568
+ impl < ' a > RemovedCell {
569
+ fn remove ( & ' a mut self ) -> Result < Option < & ' a [ HashBytes ] > , CellStorageError > {
570
+ self . removes += 1 ;
571
+ if self . removes as i64 <= self . old_rc {
572
+ Ok ( self . next_refs ( ) )
573
+ } else {
574
+ Err ( CellStorageError :: CounterMismatch )
575
+ }
576
+ }
577
+
578
+ fn next_refs ( & ' a self ) -> Option < & ' a [ HashBytes ] > {
579
+ if self . old_rc > self . removes as i64 {
580
+ None
581
+ } else {
582
+ Some ( & self . refs )
583
+ }
584
+ }
585
+ }
586
+
587
+ pub struct RemoveContext {
588
+ db : CellsDb ,
589
+ raw_cache : Arc < RawCellsCache > ,
590
+ transaction : FastDashMap < HashBytes , RemovedCell > ,
591
+ }
592
+
593
+ impl RemoveContext {
594
+ fn new ( db : & CellsDb , raw_cache : & Arc < RawCellsCache > ) -> Self {
595
+ Self {
596
+ db : db. clone ( ) ,
597
+ raw_cache : raw_cache. clone ( ) ,
598
+ transaction : FastDashMap :: with_capacity_and_hasher_and_shard_amount (
599
+ 128 ,
600
+ Default :: default ( ) ,
601
+ 512 ,
602
+ ) ,
603
+ }
604
+ }
605
+
606
+ pub fn len ( & self ) -> usize {
607
+ self . transaction . len ( )
608
+ }
609
+
610
+ pub fn remove_cell ( & self , hash : & HashBytes ) -> Result < ( ) , CellStorageError > {
611
+ let mut stack = Vec :: with_capacity ( 16 ) ;
612
+ stack. push ( vec ! [ * hash] ) ;
613
+
614
+ let mut buffer = Vec :: with_capacity ( 4 ) ;
615
+
616
+ // While some cells left
617
+ ' outer: loop {
618
+ let Some ( iter) = stack. last_mut ( ) else {
619
+ break ;
620
+ } ;
621
+
622
+ for cell_id in iter. iter ( ) {
623
+ // Process the current cell.
624
+ let refs = match self . transaction . entry ( * cell_id) {
625
+ Entry :: Occupied ( mut v) => v. get_mut ( ) . remove ( ) ?. map ( |v| v. to_vec ( ) ) ,
626
+ Entry :: Vacant ( v) => {
627
+ let old_rc =
628
+ self . raw_cache
629
+ . get_rc_for_delete ( & self . db , cell_id, & mut buffer) ?;
630
+ debug_assert ! ( old_rc > 0 ) ;
631
+
632
+ v. insert ( RemovedCell {
633
+ old_rc,
634
+ removes : 1 ,
635
+ refs : buffer. clone ( ) ,
636
+ } )
637
+ . next_refs ( )
638
+ . map ( |v| v. to_vec ( ) )
639
+ }
640
+ } ;
641
+
642
+ if let Some ( refs) = refs {
643
+ // And proceed to its refs if any.
644
+ stack. push ( refs) ;
645
+ continue ' outer;
646
+ }
647
+ }
648
+
649
+ // Drop the current cell when all of its children were processed.
650
+ stack. pop ( ) ;
651
+ }
652
+
653
+ // Clear big chunks of data before finalization
654
+ drop ( stack) ;
655
+
656
+ Ok ( ( ) )
657
+ }
658
+
659
+ pub fn finalize ( self , batch : & mut WriteBatch ) -> usize {
660
+ std:: thread:: scope ( |s| {
661
+ let number_shards = self . transaction . _shard_count ( ) ;
662
+ // safety: we hold only read locks
663
+ let shards = unsafe { ( 0 ..number_shards) . map ( |i| self . transaction . _get_read_shard ( i) ) } ;
664
+ let cache = & self . raw_cache ;
665
+
666
+ // todo: clamp to number of cpus x2
667
+ for shard in shards {
668
+ // spawned threads will be joined at the end of the scope, so we don't need to store them
669
+ s. spawn ( move || {
670
+ for ( key, item) in shard {
671
+ let value = item. get ( ) ;
672
+
673
+ let new_rc = value. old_rc - value. removes as i64 ;
674
+ cache. on_remove_cell ( key, new_rc) ;
675
+ }
676
+ } ) ;
677
+ }
678
+
679
+ let batch_update = s. spawn ( || {
680
+ let cells_cf = & self . db . cells . cf ( ) ;
681
+
682
+ let total = self . transaction . len ( ) ;
683
+
684
+ for kv in self . transaction . iter ( ) {
685
+ let key = kv. key ( ) ;
686
+ let value = kv. value ( ) ;
687
+
688
+ batch. merge_cf (
689
+ cells_cf,
690
+ key. as_slice ( ) ,
691
+ refcount:: encode_negative_refcount ( value. removes ) ,
692
+ ) ;
693
+ }
694
+
695
+ total
696
+ } ) ;
697
+
698
+ batch_update. join ( ) . expect ( "thread panicked" )
699
+ } )
700
+ }
701
+ }
702
+
557
703
#[ derive( thiserror:: Error , Debug ) ]
558
704
pub enum CellStorageError {
559
705
#[ error( "Cell not found in cell db" ) ]
0 commit comments