@@ -69,8 +69,9 @@ pub struct Any<E: RStorage + Clock + Metrics, K: Array, V: Array, H: CHasher> {
6969 /// are over keys that have been updated by some operation at or after this point).
7070 inactivity_floor_loc : u64 ,
7171
72- /// A map from each key to the location in the log containing its most recent update. Only
73- /// contains the keys that currently have a value (that is, deleted keys are not in the map).
72+ /// A snapshot of all currently active operations in the form of a map from each key to the
73+ /// location in the log containing its most recent update. Only contains the keys that currently
74+ /// have a value (that is, deleted keys are not in the map).
7475 snapshot : Index < EightCap , u64 > ,
7576
7677 /// The number of operations that are pending commit.
@@ -79,7 +80,11 @@ pub struct Any<E: RStorage + Clock + Metrics, K: Array, V: Array, H: CHasher> {
7980
8081impl < E : RStorage + Clock + Metrics , K : Array , V : Array , H : CHasher > Any < E , K , V , H > {
8182 /// Return an MMR initialized from `cfg`. Any uncommitted operations in the log will be
82- /// discarded and the state of the db will be as of the last committed operation.
83+ /// discarded and the state of the db will be as of the last committed operation. If a bitmap is
84+ /// provided, then a bit is appended for each operation in the operation log, with its value
85+ /// reflecting its activity status. The bitmap is expected to already have a number of bits
86+ /// corresponding to the portion of the database below the inactivity floor, and this method
87+ /// will panic otherwise.
8388 pub async fn init < const N : usize > (
8489 context : E ,
8590 hasher : & mut H ,
@@ -159,6 +164,9 @@ impl<E: RStorage + Clock + Metrics, K: Array, V: Array, H: CHasher> Any<E, K, V,
159164 let pruned_to_pos = mmr. pruned_to_pos ( ) ;
160165 let start_leaf_num = leaf_pos_to_num ( pruned_to_pos) . unwrap ( ) ;
161166
167+ if let Some ( ref bitmap) = bitmap {
168+ assert_eq ! ( start_leaf_num, bitmap. bit_count( ) ) ;
169+ }
162170 for i in start_leaf_num..log_size {
163171 let op: Operation < K , V > = log. read ( i) . await ?;
164172 match op. to_type ( ) {
@@ -167,6 +175,9 @@ impl<E: RStorage + Clock + Metrics, K: Array, V: Array, H: CHasher> Any<E, K, V,
167175 while let Some ( loc) = loc_iter. next ( ) {
168176 let op = log. read ( * loc) . await ?;
169177 if op. to_key ( ) == key {
178+ if let Some ( ref mut bitmap_ref) = bitmap {
179+ bitmap_ref. set_bit ( hasher, * loc, false ) ;
180+ }
170181 loc_iter. remove ( ) ;
171182 break ;
172183 }
@@ -186,6 +197,13 @@ impl<E: RStorage + Clock + Metrics, K: Array, V: Array, H: CHasher> Any<E, K, V,
186197 }
187198 Type :: Commit ( loc) => inactivity_floor_loc = loc,
188199 }
200+ if let Some ( ref mut bitmap_ref) = bitmap {
201+ // If we reach this point and a bit hasn't been added for the operation, then it's
202+ // an inactive operation and we need to tag it as such in the bitmap.
203+ if bitmap_ref. bit_count ( ) == i {
204+ bitmap_ref. append ( hasher, false ) ;
205+ }
206+ }
189207 }
190208
191209 let db = Any {
@@ -273,10 +291,10 @@ impl<E: RStorage + Clock + Metrics, K: Array, V: Array, H: CHasher> Any<E, K, V,
273291 . map ( |pos| leaf_pos_to_num ( pos) . unwrap ( ) )
274292 }
275293
276- /// Updates `key` to have value `value`. If the key already has this same value, then this is a
294+ /// Updates `key` to have value `value`. If the key already has this same value, then this is a
277295 /// no-op. The operation is reflected in the snapshot, but will be subject to rollback until the
278296 /// next successful `commit`. Returns None if the update was a no-op, and otherwise returns the
279- /// ( old_position, new_position) pair for the updated key.
297+ /// old_position (if any) for the updated key.
280298 pub async fn update (
281299 & mut self ,
282300 hasher : & mut H ,
@@ -305,8 +323,7 @@ impl<E: RStorage + Clock + Metrics, K: Array, V: Array, H: CHasher> Any<E, K, V,
305323
306324 /// Delete `key` and its value from the db. Deleting a key that already has no value is a no-op.
307325 /// The operation is reflected in the snapshot, but will be subject to rollback until the next
308- /// successful `commit`. Returns an (inactive_position, active_position) tuple for the key, or
309- /// none if the update was a no-op.
326+ /// successful `commit`. Returns the location of the deleted value for the key (if any).
310327 pub async fn delete ( & mut self , hasher : & mut H , key : K ) -> Result < Option < u64 > , Error > {
311328 let mut loc_iter = self . snapshot . remove_iter ( & key) ;
312329 for loc in & mut loc_iter {
@@ -406,7 +423,8 @@ impl<E: RStorage + Clock + Metrics, K: Array, V: Array, H: CHasher> Any<E, K, V,
406423
407424 /// Commit any pending operations to the db, ensuring they are persisted to disk & recoverable
408425 /// upon return from this function. Also raises the inactivity floor according to the schedule,
409- /// and prunes those operations below it.
426+ /// and prunes those operations below it. If an old_locs vector is provided, then any locations
427+ /// that were flipped from active to inactive during this operation are added to it.
410428 pub async fn commit (
411429 & mut self ,
412430 hasher : & mut H ,
@@ -574,12 +592,17 @@ mod test {
574592 use commonware_cryptography:: { hash, sha256:: Digest , Hasher as CHasher , Sha256 } ;
575593 use commonware_macros:: test_traced;
576594 use commonware_runtime:: { deterministic, Runner as _} ;
577- use std:: collections:: HashMap ;
595+ use rand:: {
596+ rngs:: { OsRng , StdRng } ,
597+ RngCore , SeedableRng ,
598+ } ;
599+ use std:: collections:: { HashMap , HashSet } ;
578600
579601 /// Return an `Any` database initialized with a fixed config.
580602 async fn open_db < E : RStorage + Clock + Metrics , const N : usize > (
581603 context : E ,
582604 hasher : & mut Sha256 ,
605+ bitmap : Option < & mut Bitmap < Sha256 , N > > ,
583606 ) -> Any < E , Digest , Digest , Sha256 > {
584607 let cfg = Config {
585608 mmr_journal_partition : "journal_partition" . into ( ) ,
@@ -588,7 +611,7 @@ mod test {
588611 log_journal_partition : "log_journal_partition" . into ( ) ,
589612 log_items_per_blob : 7 ,
590613 } ;
591- Any :: < E , Digest , Digest , Sha256 > :: init :: < N > ( context, hasher, cfg, None )
614+ Any :: < E , Digest , Digest , Sha256 > :: init :: < N > ( context, hasher, cfg, bitmap )
592615 . await
593616 . unwrap ( )
594617 }
@@ -598,7 +621,7 @@ mod test {
598621 let executor = deterministic:: Runner :: default ( ) ;
599622 executor. start ( |context| async move {
600623 let mut hasher = Sha256 :: new ( ) ;
601- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
624+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
602625 assert_eq ! ( db. op_count( ) , 0 ) ;
603626 assert_eq ! ( db. oldest_retained_loc( ) , None ) ;
604627 assert ! ( matches!( db. prune_inactive( ) . await , Ok ( ( ) ) ) ) ;
@@ -610,22 +633,25 @@ mod test {
610633 let root = db. root ( & mut hasher) ;
611634 db. update ( & mut hasher, d1, d2) . await . unwrap ( ) ;
612635 db. close ( ) . await . unwrap ( ) ;
613- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
636+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
614637 assert_eq ! ( db. root( & mut hasher) , root) ;
615638 assert_eq ! ( db. op_count( ) , 0 ) ;
616639
617640 // Test calling commit on an empty db which should make it (durably) non-empty.
618- db. commit ( & mut hasher, None ) . await . unwrap ( ) ;
641+ let mut old_locs = Vec :: new ( ) ;
642+ db. commit ( & mut hasher, Some ( & mut old_locs) ) . await . unwrap ( ) ;
619643 assert_eq ! ( db. op_count( ) , 1 ) ; // floor op added
644+ assert ! ( old_locs. is_empty( ) ) ;
620645 let root = db. root ( & mut hasher) ;
621646 assert ! ( matches!( db. prune_inactive( ) . await , Ok ( ( ) ) ) ) ;
622- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
647+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
623648 assert_eq ! ( db. root( & mut hasher) , root) ;
624649
625650 // Confirm the inactivity floor doesn't fall endlessly behind with multiple commits.
626651 for _ in 1 ..100 {
627- db. commit ( & mut hasher, None ) . await . unwrap ( ) ;
652+ db. commit ( & mut hasher, Some ( & mut old_locs ) ) . await . unwrap ( ) ;
628653 assert_eq ! ( db. op_count( ) - 1 , db. inactivity_floor_loc) ;
654+ assert ! ( old_locs. is_empty( ) ) ;
629655 }
630656 } ) ;
631657 }
@@ -637,7 +663,7 @@ mod test {
637663 // Build a db with 2 keys and make sure updates and deletions of those keys work as
638664 // expected.
639665 let mut hasher = Sha256 :: new ( ) ;
640- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
666+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
641667
642668 let d1 = <Sha256 as CHasher >:: Digest :: decode ( vec ! [ 1u8 ; 32 ] . as_ref ( ) ) . unwrap ( ) ;
643669 let d2 = <Sha256 as CHasher >:: Digest :: decode ( vec ! [ 2u8 ; 32 ] . as_ref ( ) ) . unwrap ( ) ;
@@ -681,9 +707,11 @@ mod test {
681707 db. sync ( ) . await . unwrap ( ) ;
682708
683709 // Advance over 3 inactive operations.
684- db. raise_inactivity_floor ( & mut hasher, 3 , None )
710+ let mut old_locs = Vec :: new ( ) ;
711+ db. raise_inactivity_floor ( & mut hasher, 3 , Some ( & mut old_locs) )
685712 . await
686713 . unwrap ( ) ;
714+ assert ! ( old_locs. is_empty( ) ) ; // no active ops were moved
687715 assert_eq ! ( db. inactivity_floor_loc, 3 ) ;
688716 assert_eq ! ( db. log. size( ) . await . unwrap( ) , 6 ) ; // 4 updates, 1 deletion, 1 commit
689717
@@ -718,11 +746,12 @@ mod test {
718746 assert_eq ! ( db. root( & mut hasher) , root) ;
719747
720748 // Make sure closing/reopening gets us back to the same state.
721- db. commit ( & mut hasher, None ) . await . unwrap ( ) ;
749+ db. commit ( & mut hasher, Some ( & mut old_locs) ) . await . unwrap ( ) ;
750+ assert ! ( old_locs. is_empty( ) ) ;
722751 assert_eq ! ( db. log. size( ) . await . unwrap( ) , 9 ) ;
723752 let root = db. root ( & mut hasher) ;
724753 db. close ( ) . await . unwrap ( ) ;
725- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
754+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
726755 assert_eq ! ( db. log. size( ) . await . unwrap( ) , 9 ) ;
727756 assert_eq ! ( db. root( & mut hasher) , root) ;
728757
@@ -745,7 +774,7 @@ mod test {
745774 db. commit ( & mut hasher, None ) . await . unwrap ( ) ;
746775 let root = db. root ( & mut hasher) ;
747776 db. close ( ) . await . unwrap ( ) ;
748- let mut db = open_db :: < _ , 32 > ( context, & mut hasher) . await ;
777+ let mut db = open_db :: < _ , 32 > ( context, & mut hasher, None ) . await ;
749778 assert_eq ! ( db. root( & mut hasher) , root) ;
750779
751780 // Commit will raise the inactivity floor, which won't affect state but will affect the
@@ -768,12 +797,12 @@ mod test {
768797 let executor = deterministic:: Runner :: default ( ) ;
769798 // Build a db with 1000 keys, some of which we update and some of which we delete, and
770799 // confirm that the end state of the db matches that of an identically updated hashmap.
800+ const ELEMENTS : u64 = 1000 ;
771801 executor. start ( |context| async move {
772802 let mut hasher = Sha256 :: new ( ) ;
773- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
803+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
774804
775805 let mut map = HashMap :: < Digest , Digest > :: default ( ) ;
776- const ELEMENTS : u64 = 1000 ;
777806 for i in 0u64 ..ELEMENTS {
778807 let k = hash ( & i. to_be_bytes ( ) ) ;
779808 let v = hash ( & ( i * 1000 ) . to_be_bytes ( ) ) ;
@@ -809,15 +838,17 @@ mod test {
809838 assert_eq ! ( db. snapshot. len( ) , 857 ) ;
810839
811840 // Test that commit will raise the activity floor.
812- db. commit ( & mut hasher, None ) . await . unwrap ( ) ;
841+ let mut old_locs = Vec :: new ( ) ;
842+ db. commit ( & mut hasher, Some ( & mut old_locs) ) . await . unwrap ( ) ;
813843 assert_eq ! ( db. op_count( ) , 2336 ) ;
814844 assert_eq ! ( db. oldest_retained_loc( ) . unwrap( ) , 1478 ) ;
815845 assert_eq ! ( db. snapshot. len( ) , 857 ) ;
846+ assert_eq ! ( old_locs. len( ) , 858 ) ; // 857 active ops moved once, one moved twice.
816847
817848 // Close & reopen the db, making sure the re-opened db has exactly the same state.
818849 let root_hash = db. root ( & mut hasher) ;
819850 db. close ( ) . await . unwrap ( ) ;
820- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
851+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
821852 assert_eq ! ( root_hash, db. root( & mut hasher) ) ;
822853 assert_eq ! ( db. op_count( ) , 2336 ) ;
823854 assert_eq ! ( db. inactivity_floor_loc, 1478 ) ;
@@ -877,7 +908,7 @@ mod test {
877908 let executor = deterministic:: Runner :: default ( ) ;
878909 executor. start ( |context| async move {
879910 let mut hasher = Sha256 :: new ( ) ;
880- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
911+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
881912
882913 // Insert 1000 keys then sync.
883914 const ELEMENTS : u64 = 1000 ;
@@ -904,7 +935,7 @@ mod test {
904935
905936 // Journaled MMR recovery should read the orphaned leaf & its parents, then log
906937 // replaying will restore the rest.
907- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
938+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
908939 assert_eq ! ( db. root( & mut hasher) , root) ;
909940
910941 // Write some additional nodes, simulate failed log commit, and test we recover to the previous commit point.
@@ -914,7 +945,7 @@ mod test {
914945 db. update ( & mut hasher, k, v) . await . unwrap ( ) ;
915946 }
916947 db. simulate_failed_commit_log ( & mut hasher) . await . unwrap ( ) ;
917- let db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
948+ let db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
918949 assert_eq ! ( db. root( & mut hasher) , root) ;
919950 } ) ;
920951 }
@@ -926,7 +957,7 @@ mod test {
926957 let executor = deterministic:: Runner :: default ( ) ;
927958 executor. start ( |context| async move {
928959 let mut hasher = Sha256 :: new ( ) ;
929- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
960+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
930961
931962 // Update the same key many times.
932963 const UPDATES : u64 = 100 ;
@@ -935,12 +966,15 @@ mod test {
935966 let v = hash ( & ( i * 1000 ) . to_be_bytes ( ) ) ;
936967 db. update ( & mut hasher, k, v) . await . unwrap ( ) ;
937968 }
938- db. commit ( & mut hasher, None ) . await . unwrap ( ) ;
969+ let mut old_locs = Vec :: new ( ) ;
970+ db. commit ( & mut hasher, Some ( & mut old_locs) ) . await . unwrap ( ) ;
971+ // only 1 active op, but it gets moved twice due to the commit op.
972+ assert_eq ! ( old_locs. len( ) , 2 ) ;
939973 let root = db. root ( & mut hasher) ;
940974 db. close ( ) . await . unwrap ( ) ;
941975
942976 // Simulate a failed commit and test that the log replay doesn't leave behind old data.
943- let db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
977+ let db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
944978 let iter = db. snapshot . get_iter ( & k) ;
945979 assert_eq ! ( iter. cloned( ) . collect:: <Vec <_>>( ) . len( ) , 1 ) ;
946980 assert_eq ! ( db. root( & mut hasher) , root) ;
@@ -952,7 +986,7 @@ mod test {
952986 let executor = deterministic:: Runner :: default ( ) ;
953987 executor. start ( |context| async move {
954988 let mut hasher = Sha256 :: new ( ) ;
955- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
989+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
956990
957991 let mut map = HashMap :: < Digest , Digest > :: default ( ) ;
958992 const ELEMENTS : u64 = 10 ;
@@ -977,9 +1011,89 @@ mod test {
9771011 // Close & reopen the db, making sure the re-opened db has exactly the same state.
9781012 let root_hash = db. root ( & mut hasher) ;
9791013 db. close ( ) . await . unwrap ( ) ;
980- let db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
1014+ let db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
9811015 assert_eq ! ( root_hash, db. root( & mut hasher) ) ;
9821016 assert ! ( db. get( & k) . await . unwrap( ) . is_none( ) ) ;
9831017 } ) ;
9841018 }
1019+
1020+ /// This test builds a random database which we then commit, close, and reopen with a bitmap. We
1021+ /// then check that the bitmap state matches that of the database's active operations.
1022+ #[ test_traced( "WARN" ) ]
1023+ pub fn test_any_restore_with_bitmap ( ) {
1024+ // Number of elements to initially insert into the db.
1025+ const ELEMENTS : u64 = 1000 ;
1026+
1027+ // Use a non-deterministic rng seed to ensure each run is different.
1028+ let rng_seed = OsRng . next_u64 ( ) ;
1029+ // Log the seed with high visibility to make failures reproducible.
1030+ warn ! ( "rng_seed={}" , rng_seed) ;
1031+ let mut rng = StdRng :: seed_from_u64 ( rng_seed) ;
1032+
1033+ let executor = deterministic:: Runner :: default ( ) ;
1034+ executor. start ( |context| async move {
1035+ let mut hasher = Sha256 :: new ( ) ;
1036+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
1037+
1038+ for i in 0u64 ..ELEMENTS {
1039+ let k = hash ( & i. to_be_bytes ( ) ) ;
1040+ let v = hash ( & rng. next_u32 ( ) . to_be_bytes ( ) ) ;
1041+ db. update ( & mut hasher, k, v) . await . unwrap ( ) ;
1042+ }
1043+
1044+ // Randomly update / delete them. We use a delete frequency that is 1/7th of the update
1045+ // frequency.
1046+ for _ in 0u64 ..ELEMENTS * 10 {
1047+ let rand_key = hash ( & ( rng. next_u64 ( ) % ELEMENTS ) . to_be_bytes ( ) ) ;
1048+ if rng. next_u32 ( ) % 7 == 0 {
1049+ db. delete ( & mut hasher, rand_key) . await . unwrap ( ) ;
1050+ continue ;
1051+ }
1052+ let v = hash ( & rng. next_u32 ( ) . to_be_bytes ( ) ) ;
1053+ db. update ( & mut hasher, rand_key, v) . await . unwrap ( ) ;
1054+ if rng. next_u32 ( ) % 20 == 0 {
1055+ // Commit every ~20 updates.
1056+ db. commit ( & mut hasher, None ) . await . unwrap ( ) ;
1057+ }
1058+ }
1059+ db. commit ( & mut hasher, None ) . await . unwrap ( ) ;
1060+
1061+ // Create a bitmap based on the current db's pruned/inactive state.
1062+ let mut bitmap = Bitmap :: < _ , 32 > :: new ( ) ;
1063+ for _ in 0 ..db. inactivity_floor_loc {
1064+ bitmap. append ( & mut hasher, false ) ;
1065+ }
1066+ assert_eq ! ( bitmap. bit_count( ) , db. inactivity_floor_loc) ;
1067+
1068+ // Close the db, then re-open it with a bitmap.
1069+ let root = db. root ( & mut hasher) ;
1070+ db. close ( ) . await . unwrap ( ) ;
1071+ let db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, Some ( & mut bitmap) ) . await ;
1072+ assert_eq ! ( db. root( & mut hasher) , root) ;
1073+
1074+ // Check the bitmap state matches that of the snapshot.
1075+ let items = db. log . size ( ) . await . unwrap ( ) ;
1076+ assert_eq ! ( bitmap. bit_count( ) , items) ;
1077+ let mut active_positions = HashSet :: new ( ) ;
1078+ for pos in db. inactivity_floor_loc ..items {
1079+ let item = db. log . read ( pos) . await . unwrap ( ) ;
1080+ let iter = db. snapshot . get_iter ( & item. to_key ( ) ) ;
1081+ let mut active = false ;
1082+ for loc in iter {
1083+ if * loc == pos {
1084+ // Found an active op.
1085+ active_positions. insert ( pos) ;
1086+ active = true ;
1087+ break ;
1088+ }
1089+ }
1090+ assert_eq ! ( bitmap. get_bit( pos) , active, "position {} should match" , pos) ;
1091+ }
1092+ for pos in db. inactivity_floor_loc ..items {
1093+ if !active_positions. contains ( & pos) {
1094+ assert ! ( !bitmap. get_bit( pos) ) ;
1095+ }
1096+ }
1097+ } ) ;
1098+ }
9851099}
0 commit comments