@@ -33,7 +33,7 @@ use tracing::{debug, warn};
3333/// Configuration for an `Any` authenticated db.
3434#[ derive( Clone ) ]
3535pub struct Config {
36- /// The name of the `Storage` partition used for the MMR's backing journal.
36+ /// The name of the `Storage` ˘ used for the MMR's backing journal.
3737 pub mmr_journal_partition : String ,
3838
3939 /// The items per blob configuration value used by the MMR journal.
@@ -69,8 +69,9 @@ pub struct Any<E: RStorage + Clock + Metrics, K: Array, V: Array, H: CHasher> {
6969 /// are over keys that have been updated by some operation at or after this point).
7070 inactivity_floor_loc : u64 ,
7171
72- /// A map from each key to the location in the log containing its most recent update. Only
73- /// contains the keys that currently have a value (that is, deleted keys are not in the map).
72+ /// A snapshot of all currently active operations in the form of a map from each key to the
73+ /// location in the log containing its most recent update. Only contains the keys that currently
74+ /// have a value (that is, deleted keys are not in the map).
7475 snapshot : Index < EightCap , u64 > ,
7576
7677 /// The number of operations that are pending commit.
@@ -186,6 +187,13 @@ impl<E: RStorage + Clock + Metrics, K: Array, V: Array, H: CHasher> Any<E, K, V,
186187 }
187188 Type :: Commit ( loc) => inactivity_floor_loc = loc,
188189 }
190+ if let Some ( ref mut bitmap_ref) = bitmap {
191+ // If we reach this point and a bit hasn't been added for the operation, then it's
192+ // an inactive operation and we need to tag it as such in the bitmap.
193+ if bitmap_ref. bit_count ( ) == i {
194+ bitmap_ref. append ( hasher, false ) ;
195+ }
196+ }
189197 }
190198
191199 let db = Any {
@@ -276,7 +284,7 @@ impl<E: RStorage + Clock + Metrics, K: Array, V: Array, H: CHasher> Any<E, K, V,
276284 /// Updates `key` to have value `value`. If the key already has this same value, then this is a
277285 /// no-op. The operation is reflected in the snapshot, but will be subject to rollback until the
278286 /// next successful `commit`. Returns None if the update was a no-op, and otherwise returns the
279- /// ( old_position, new_position) pair for the updated key.
287+ /// old_position (if any) for the updated key.
280288 pub async fn update (
281289 & mut self ,
282290 hasher : & mut H ,
@@ -574,12 +582,17 @@ mod test {
574582 use commonware_cryptography:: { hash, sha256:: Digest , Hasher as CHasher , Sha256 } ;
575583 use commonware_macros:: test_traced;
576584 use commonware_runtime:: { deterministic, Runner as _} ;
577- use std:: collections:: HashMap ;
585+ use rand:: {
586+ rngs:: { OsRng , StdRng } ,
587+ RngCore , SeedableRng ,
588+ } ;
589+ use std:: collections:: { HashMap , HashSet } ;
578590
579591 /// Return an `Any` database initialized with a fixed config.
580592 async fn open_db < E : RStorage + Clock + Metrics , const N : usize > (
581593 context : E ,
582594 hasher : & mut Sha256 ,
595+ bitmap : Option < & mut Bitmap < Sha256 , N > > ,
583596 ) -> Any < E , Digest , Digest , Sha256 > {
584597 let cfg = Config {
585598 mmr_journal_partition : "journal_partition" . into ( ) ,
@@ -588,7 +601,7 @@ mod test {
588601 log_journal_partition : "log_journal_partition" . into ( ) ,
589602 log_items_per_blob : 7 ,
590603 } ;
591- Any :: < E , Digest , Digest , Sha256 > :: init :: < N > ( context, hasher, cfg, None )
604+ Any :: < E , Digest , Digest , Sha256 > :: init :: < N > ( context, hasher, cfg, bitmap )
592605 . await
593606 . unwrap ( )
594607 }
@@ -598,7 +611,7 @@ mod test {
598611 let executor = deterministic:: Runner :: default ( ) ;
599612 executor. start ( |context| async move {
600613 let mut hasher = Sha256 :: new ( ) ;
601- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
614+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
602615 assert_eq ! ( db. op_count( ) , 0 ) ;
603616 assert_eq ! ( db. oldest_retained_loc( ) , None ) ;
604617 assert ! ( matches!( db. prune_inactive( ) . await , Ok ( ( ) ) ) ) ;
@@ -610,22 +623,25 @@ mod test {
610623 let root = db. root ( & mut hasher) ;
611624 db. update ( & mut hasher, d1, d2) . await . unwrap ( ) ;
612625 db. close ( ) . await . unwrap ( ) ;
613- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
626+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
614627 assert_eq ! ( db. root( & mut hasher) , root) ;
615628 assert_eq ! ( db. op_count( ) , 0 ) ;
616629
617630 // Test calling commit on an empty db which should make it (durably) non-empty.
618- db. commit ( & mut hasher, None ) . await . unwrap ( ) ;
631+ let mut old_locs = Vec :: new ( ) ;
632+ db. commit ( & mut hasher, Some ( & mut old_locs) ) . await . unwrap ( ) ;
619633 assert_eq ! ( db. op_count( ) , 1 ) ; // floor op added
634+ assert ! ( old_locs. is_empty( ) ) ;
620635 let root = db. root ( & mut hasher) ;
621636 assert ! ( matches!( db. prune_inactive( ) . await , Ok ( ( ) ) ) ) ;
622- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
637+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
623638 assert_eq ! ( db. root( & mut hasher) , root) ;
624639
625640 // Confirm the inactivity floor doesn't fall endlessly behind with multiple commits.
626641 for _ in 1 ..100 {
627- db. commit ( & mut hasher, None ) . await . unwrap ( ) ;
642+ db. commit ( & mut hasher, Some ( & mut old_locs ) ) . await . unwrap ( ) ;
628643 assert_eq ! ( db. op_count( ) - 1 , db. inactivity_floor_loc) ;
644+ assert ! ( old_locs. is_empty( ) ) ;
629645 }
630646 } ) ;
631647 }
@@ -637,7 +653,7 @@ mod test {
637653 // Build a db with 2 keys and make sure updates and deletions of those keys work as
638654 // expected.
639655 let mut hasher = Sha256 :: new ( ) ;
640- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
656+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
641657
642658 let d1 = <Sha256 as CHasher >:: Digest :: decode ( vec ! [ 1u8 ; 32 ] . as_ref ( ) ) . unwrap ( ) ;
643659 let d2 = <Sha256 as CHasher >:: Digest :: decode ( vec ! [ 2u8 ; 32 ] . as_ref ( ) ) . unwrap ( ) ;
@@ -681,9 +697,11 @@ mod test {
681697 db. sync ( ) . await . unwrap ( ) ;
682698
683699 // Advance over 3 inactive operations.
684- db. raise_inactivity_floor ( & mut hasher, 3 , None )
700+ let mut old_locs = Vec :: new ( ) ;
701+ db. raise_inactivity_floor ( & mut hasher, 3 , Some ( & mut old_locs) )
685702 . await
686703 . unwrap ( ) ;
704+ assert ! ( old_locs. is_empty( ) ) ; // no active ops were moved
687705 assert_eq ! ( db. inactivity_floor_loc, 3 ) ;
688706 assert_eq ! ( db. log. size( ) . await . unwrap( ) , 6 ) ; // 4 updates, 1 deletion, 1 commit
689707
@@ -718,11 +736,12 @@ mod test {
718736 assert_eq ! ( db. root( & mut hasher) , root) ;
719737
720738 // Make sure closing/reopening gets us back to the same state.
721- db. commit ( & mut hasher, None ) . await . unwrap ( ) ;
739+ db. commit ( & mut hasher, Some ( & mut old_locs) ) . await . unwrap ( ) ;
740+ assert ! ( old_locs. is_empty( ) ) ;
722741 assert_eq ! ( db. log. size( ) . await . unwrap( ) , 9 ) ;
723742 let root = db. root ( & mut hasher) ;
724743 db. close ( ) . await . unwrap ( ) ;
725- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
744+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
726745 assert_eq ! ( db. log. size( ) . await . unwrap( ) , 9 ) ;
727746 assert_eq ! ( db. root( & mut hasher) , root) ;
728747
@@ -745,7 +764,7 @@ mod test {
745764 db. commit ( & mut hasher, None ) . await . unwrap ( ) ;
746765 let root = db. root ( & mut hasher) ;
747766 db. close ( ) . await . unwrap ( ) ;
748- let mut db = open_db :: < _ , 32 > ( context, & mut hasher) . await ;
767+ let mut db = open_db :: < _ , 32 > ( context, & mut hasher, None ) . await ;
749768 assert_eq ! ( db. root( & mut hasher) , root) ;
750769
751770 // Commit will raise the inactivity floor, which won't affect state but will affect the
@@ -768,12 +787,12 @@ mod test {
768787 let executor = deterministic:: Runner :: default ( ) ;
769788 // Build a db with 1000 keys, some of which we update and some of which we delete, and
770789 // confirm that the end state of the db matches that of an identically updated hashmap.
790+ const ELEMENTS : u64 = 1000 ;
771791 executor. start ( |context| async move {
772792 let mut hasher = Sha256 :: new ( ) ;
773- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
793+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
774794
775795 let mut map = HashMap :: < Digest , Digest > :: default ( ) ;
776- const ELEMENTS : u64 = 1000 ;
777796 for i in 0u64 ..ELEMENTS {
778797 let k = hash ( & i. to_be_bytes ( ) ) ;
779798 let v = hash ( & ( i * 1000 ) . to_be_bytes ( ) ) ;
@@ -809,15 +828,17 @@ mod test {
809828 assert_eq ! ( db. snapshot. len( ) , 857 ) ;
810829
811830 // Test that commit will raise the activity floor.
812- db. commit ( & mut hasher, None ) . await . unwrap ( ) ;
831+ let mut old_locs = Vec :: new ( ) ;
832+ db. commit ( & mut hasher, Some ( & mut old_locs) ) . await . unwrap ( ) ;
813833 assert_eq ! ( db. op_count( ) , 2336 ) ;
814834 assert_eq ! ( db. oldest_retained_loc( ) . unwrap( ) , 1478 ) ;
815835 assert_eq ! ( db. snapshot. len( ) , 857 ) ;
836+ assert_eq ! ( old_locs. len( ) , 858 ) ; // 857 active ops moved once, one moved twice.
816837
817838 // Close & reopen the db, making sure the re-opened db has exactly the same state.
818839 let root_hash = db. root ( & mut hasher) ;
819840 db. close ( ) . await . unwrap ( ) ;
820- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
841+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
821842 assert_eq ! ( root_hash, db. root( & mut hasher) ) ;
822843 assert_eq ! ( db. op_count( ) , 2336 ) ;
823844 assert_eq ! ( db. inactivity_floor_loc, 1478 ) ;
@@ -877,7 +898,7 @@ mod test {
877898 let executor = deterministic:: Runner :: default ( ) ;
878899 executor. start ( |context| async move {
879900 let mut hasher = Sha256 :: new ( ) ;
880- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
901+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
881902
882903 // Insert 1000 keys then sync.
883904 const ELEMENTS : u64 = 1000 ;
@@ -904,7 +925,7 @@ mod test {
904925
905926 // Journaled MMR recovery should read the orphaned leaf & its parents, then log
906927 // replaying will restore the rest.
907- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
928+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
908929 assert_eq ! ( db. root( & mut hasher) , root) ;
909930
910931 // Write some additional nodes, simulate failed log commit, and test we recover to the previous commit point.
@@ -914,7 +935,7 @@ mod test {
914935 db. update ( & mut hasher, k, v) . await . unwrap ( ) ;
915936 }
916937 db. simulate_failed_commit_log ( & mut hasher) . await . unwrap ( ) ;
917- let db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
938+ let db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
918939 assert_eq ! ( db. root( & mut hasher) , root) ;
919940 } ) ;
920941 }
@@ -926,7 +947,7 @@ mod test {
926947 let executor = deterministic:: Runner :: default ( ) ;
927948 executor. start ( |context| async move {
928949 let mut hasher = Sha256 :: new ( ) ;
929- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
950+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
930951
931952 // Update the same key many times.
932953 const UPDATES : u64 = 100 ;
@@ -935,12 +956,15 @@ mod test {
935956 let v = hash ( & ( i * 1000 ) . to_be_bytes ( ) ) ;
936957 db. update ( & mut hasher, k, v) . await . unwrap ( ) ;
937958 }
938- db. commit ( & mut hasher, None ) . await . unwrap ( ) ;
959+ let mut old_locs = Vec :: new ( ) ;
960+ db. commit ( & mut hasher, Some ( & mut old_locs) ) . await . unwrap ( ) ;
961+ // only 1 active op, but it gets moved twice due to the commit op.
962+ assert_eq ! ( old_locs. len( ) , 2 ) ;
939963 let root = db. root ( & mut hasher) ;
940964 db. close ( ) . await . unwrap ( ) ;
941965
942966 // Simulate a failed commit and test that the log replay doesn't leave behind old data.
943- let db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
967+ let db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
944968 let iter = db. snapshot . get_iter ( & k) ;
945969 assert_eq ! ( iter. cloned( ) . collect:: <Vec <_>>( ) . len( ) , 1 ) ;
946970 assert_eq ! ( db. root( & mut hasher) , root) ;
@@ -952,7 +976,7 @@ mod test {
952976 let executor = deterministic:: Runner :: default ( ) ;
953977 executor. start ( |context| async move {
954978 let mut hasher = Sha256 :: new ( ) ;
955- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
979+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
956980
957981 let mut map = HashMap :: < Digest , Digest > :: default ( ) ;
958982 const ELEMENTS : u64 = 10 ;
@@ -977,9 +1001,85 @@ mod test {
9771001 // Close & reopen the db, making sure the re-opened db has exactly the same state.
9781002 let root_hash = db. root ( & mut hasher) ;
9791003 db. close ( ) . await . unwrap ( ) ;
980- let db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
1004+ let db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
9811005 assert_eq ! ( root_hash, db. root( & mut hasher) ) ;
9821006 assert ! ( db. get( & k) . await . unwrap( ) . is_none( ) ) ;
9831007 } ) ;
9841008 }
1009+
1010+ /// This test builds a random database which we then commit, close, and reopen with a bitmap. We
1011+ /// then check that the bitmap state matches that of the database's active operations.
1012+ #[ test_traced( "WARN" ) ]
1013+ pub fn test_any_restore_with_bitmap ( ) {
1014+ // Number of elements to initially insert into the db.
1015+ const ELEMENTS : u64 = 1000 ;
1016+
1017+ // Use a non-deterministic rng seed to ensure each run is different.
1018+ let rng_seed = OsRng . next_u64 ( ) ;
1019+ // Log the seed with high visibility to make failures reproducible.
1020+ warn ! ( "rng_seed={}" , rng_seed) ;
1021+ let mut rng = StdRng :: seed_from_u64 ( rng_seed) ;
1022+
1023+ let executor = deterministic:: Runner :: default ( ) ;
1024+ executor. start ( |context| async move {
1025+ let mut hasher = Sha256 :: new ( ) ;
1026+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
1027+
1028+ for i in 0u64 ..ELEMENTS {
1029+ let k = hash ( & i. to_be_bytes ( ) ) ;
1030+ let v = hash ( & rng. next_u32 ( ) . to_be_bytes ( ) ) ;
1031+ db. update ( & mut hasher, k, v) . await . unwrap ( ) ;
1032+ }
1033+
1034+ // Randomly update / delete them. We use a delete frequency that is 1/7th of the update
1035+ // frequency.
1036+ for _ in 0u64 ..ELEMENTS * 10 {
1037+ let rand_key = hash ( & ( rng. next_u64 ( ) % ELEMENTS ) . to_be_bytes ( ) ) ;
1038+ if rng. next_u32 ( ) % 7 != 0 {
1039+ db. delete ( & mut hasher, rand_key) . await . unwrap ( ) ;
1040+ continue ;
1041+ }
1042+ let v = hash ( & rng. next_u32 ( ) . to_be_bytes ( ) ) ;
1043+ db. update ( & mut hasher, rand_key, v) . await . unwrap ( ) ;
1044+ }
1045+ db. commit ( & mut hasher, None ) . await . unwrap ( ) ;
1046+
1047+ warn ! ( "Snapshot size after updates: {}" , db. snapshot. len( ) ) ;
1048+
1049+ // Create a bitmap based on the current db's pruned/inactive state.
1050+ let mut bitmap = Bitmap :: < _ , 32 > :: new ( ) ;
1051+ for _ in 0 ..db. inactivity_floor_loc {
1052+ bitmap. append ( & mut hasher, false ) ;
1053+ }
1054+ assert_eq ! ( bitmap. bit_count( ) , db. inactivity_floor_loc) ;
1055+
1056+ // Close the db, then re-open it with a bitmap.
1057+ db. close ( ) . await . unwrap ( ) ;
1058+ let db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, Some ( & mut bitmap) ) . await ;
1059+
1060+ // Check the bitmap state matches that of the snapshot.
1061+ let items = db. log . size ( ) . await . unwrap ( ) ;
1062+ assert_eq ! ( bitmap. bit_count( ) , items) ;
1063+ let mut active_positions = HashSet :: new ( ) ;
1064+ for pos in db. inactivity_floor_loc ..items {
1065+ let item = db. log . read ( pos) . await . unwrap ( ) ;
1066+ let iter = db. snapshot . get_iter ( & item. to_key ( ) ) ;
1067+ let mut active = false ;
1068+ for loc in iter {
1069+ if * loc == pos {
1070+ // Found an active op.
1071+ active_positions. insert ( pos) ;
1072+ active = true ;
1073+ break ;
1074+ }
1075+ }
1076+ assert_eq ! ( bitmap. get_bit( pos) , active) ;
1077+ }
1078+ for pos in db. inactivity_floor_loc ..items {
1079+ if !active_positions. contains ( & pos) {
1080+ assert ! ( !bitmap. get_bit( pos) , "position {} should be inactive" , pos) ;
1081+ }
1082+ }
1083+ } ) ;
1084+ }
9851085}
0 commit comments