@@ -33,7 +33,7 @@ use tracing::{debug, warn};
3333/// Configuration for an `Any` authenticated db.
3434#[ derive( Clone ) ]
3535pub struct Config {
36- /// The name of the `Storage` partition used for the MMR's backing journal.
36+ /// The name of the `Storage` ˘ used for the MMR's backing journal.
3737 pub mmr_journal_partition : String ,
3838
3939 /// The items per blob configuration value used by the MMR journal.
@@ -69,8 +69,9 @@ pub struct Any<E: RStorage + Clock + Metrics, K: Array, V: Array, H: CHasher> {
6969 /// are over keys that have been updated by some operation at or after this point).
7070 inactivity_floor_loc : u64 ,
7171
72- /// A map from each key to the location in the log containing its most recent update. Only
73- /// contains the keys that currently have a value (that is, deleted keys are not in the map).
72+ /// A snapshot of all currently active operations in the form of a map from each key to the
73+ /// location in the log containing its most recent update. Only contains the keys that currently
74+ /// have a value (that is, deleted keys are not in the map).
7475 snapshot : Index < EightCap , u64 > ,
7576
7677 /// The number of operations that are pending commit.
@@ -79,7 +80,9 @@ pub struct Any<E: RStorage + Clock + Metrics, K: Array, V: Array, H: CHasher> {
7980
8081impl < E : RStorage + Clock + Metrics , K : Array , V : Array , H : CHasher > Any < E , K , V , H > {
8182 /// Return an MMR initialized from `cfg`. Any uncommitted operations in the log will be
82- /// discarded and the state of the db will be as of the last committed operation.
83+ /// discarded and the state of the db will be as of the last committed operation. If a bitmap is
84+ /// provided, then a bit is appended for each operation in the operation log, with its value
85+ /// appropriately set to its activity status.
8386 pub async fn init < const N : usize > (
8487 context : E ,
8588 hasher : & mut H ,
@@ -167,6 +170,9 @@ impl<E: RStorage + Clock + Metrics, K: Array, V: Array, H: CHasher> Any<E, K, V,
167170 while let Some ( loc) = loc_iter. next ( ) {
168171 let op = log. read ( * loc) . await ?;
169172 if op. to_key ( ) == key {
173+ if let Some ( ref mut bitmap_ref) = bitmap {
174+ bitmap_ref. set_bit ( hasher, * loc, false ) ;
175+ }
170176 loc_iter. remove ( ) ;
171177 break ;
172178 }
@@ -186,6 +192,13 @@ impl<E: RStorage + Clock + Metrics, K: Array, V: Array, H: CHasher> Any<E, K, V,
186192 }
187193 Type :: Commit ( loc) => inactivity_floor_loc = loc,
188194 }
195+ if let Some ( ref mut bitmap_ref) = bitmap {
196+ // If we reach this point and a bit hasn't been added for the operation, then it's
197+ // an inactive operation and we need to tag it as such in the bitmap.
198+ if bitmap_ref. bit_count ( ) == i {
199+ bitmap_ref. append ( hasher, false ) ;
200+ }
201+ }
189202 }
190203
191204 let db = Any {
@@ -276,7 +289,7 @@ impl<E: RStorage + Clock + Metrics, K: Array, V: Array, H: CHasher> Any<E, K, V,
276289 /// Updates `key` to have value `value`. If the key already has this same value, then this is a
277290 /// no-op. The operation is reflected in the snapshot, but will be subject to rollback until the
278291 /// next successful `commit`. Returns None if the update was a no-op, and otherwise returns the
279- /// ( old_position, new_position) pair for the updated key.
292+ /// old_position (if any) for the updated key.
280293 pub async fn update (
281294 & mut self ,
282295 hasher : & mut H ,
@@ -574,12 +587,17 @@ mod test {
574587 use commonware_cryptography:: { hash, sha256:: Digest , Hasher as CHasher , Sha256 } ;
575588 use commonware_macros:: test_traced;
576589 use commonware_runtime:: { deterministic, Runner as _} ;
577- use std:: collections:: HashMap ;
590+ use rand:: {
591+ rngs:: { OsRng , StdRng } ,
592+ RngCore , SeedableRng ,
593+ } ;
594+ use std:: collections:: { HashMap , HashSet } ;
578595
579596 /// Return an `Any` database initialized with a fixed config.
580597 async fn open_db < E : RStorage + Clock + Metrics , const N : usize > (
581598 context : E ,
582599 hasher : & mut Sha256 ,
600+ bitmap : Option < & mut Bitmap < Sha256 , N > > ,
583601 ) -> Any < E , Digest , Digest , Sha256 > {
584602 let cfg = Config {
585603 mmr_journal_partition : "journal_partition" . into ( ) ,
@@ -588,7 +606,7 @@ mod test {
588606 log_journal_partition : "log_journal_partition" . into ( ) ,
589607 log_items_per_blob : 7 ,
590608 } ;
591- Any :: < E , Digest , Digest , Sha256 > :: init :: < N > ( context, hasher, cfg, None )
609+ Any :: < E , Digest , Digest , Sha256 > :: init :: < N > ( context, hasher, cfg, bitmap )
592610 . await
593611 . unwrap ( )
594612 }
@@ -598,7 +616,7 @@ mod test {
598616 let executor = deterministic:: Runner :: default ( ) ;
599617 executor. start ( |context| async move {
600618 let mut hasher = Sha256 :: new ( ) ;
601- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
619+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
602620 assert_eq ! ( db. op_count( ) , 0 ) ;
603621 assert_eq ! ( db. oldest_retained_loc( ) , None ) ;
604622 assert ! ( matches!( db. prune_inactive( ) . await , Ok ( ( ) ) ) ) ;
@@ -610,22 +628,25 @@ mod test {
610628 let root = db. root ( & mut hasher) ;
611629 db. update ( & mut hasher, d1, d2) . await . unwrap ( ) ;
612630 db. close ( ) . await . unwrap ( ) ;
613- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
631+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
614632 assert_eq ! ( db. root( & mut hasher) , root) ;
615633 assert_eq ! ( db. op_count( ) , 0 ) ;
616634
617635 // Test calling commit on an empty db which should make it (durably) non-empty.
618- db. commit ( & mut hasher, None ) . await . unwrap ( ) ;
636+ let mut old_locs = Vec :: new ( ) ;
637+ db. commit ( & mut hasher, Some ( & mut old_locs) ) . await . unwrap ( ) ;
619638 assert_eq ! ( db. op_count( ) , 1 ) ; // floor op added
639+ assert ! ( old_locs. is_empty( ) ) ;
620640 let root = db. root ( & mut hasher) ;
621641 assert ! ( matches!( db. prune_inactive( ) . await , Ok ( ( ) ) ) ) ;
622- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
642+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
623643 assert_eq ! ( db. root( & mut hasher) , root) ;
624644
625645 // Confirm the inactivity floor doesn't fall endlessly behind with multiple commits.
626646 for _ in 1 ..100 {
627- db. commit ( & mut hasher, None ) . await . unwrap ( ) ;
647+ db. commit ( & mut hasher, Some ( & mut old_locs ) ) . await . unwrap ( ) ;
628648 assert_eq ! ( db. op_count( ) - 1 , db. inactivity_floor_loc) ;
649+ assert ! ( old_locs. is_empty( ) ) ;
629650 }
630651 } ) ;
631652 }
@@ -637,7 +658,7 @@ mod test {
637658 // Build a db with 2 keys and make sure updates and deletions of those keys work as
638659 // expected.
639660 let mut hasher = Sha256 :: new ( ) ;
640- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
661+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
641662
642663 let d1 = <Sha256 as CHasher >:: Digest :: decode ( vec ! [ 1u8 ; 32 ] . as_ref ( ) ) . unwrap ( ) ;
643664 let d2 = <Sha256 as CHasher >:: Digest :: decode ( vec ! [ 2u8 ; 32 ] . as_ref ( ) ) . unwrap ( ) ;
@@ -681,9 +702,11 @@ mod test {
681702 db. sync ( ) . await . unwrap ( ) ;
682703
683704 // Advance over 3 inactive operations.
684- db. raise_inactivity_floor ( & mut hasher, 3 , None )
705+ let mut old_locs = Vec :: new ( ) ;
706+ db. raise_inactivity_floor ( & mut hasher, 3 , Some ( & mut old_locs) )
685707 . await
686708 . unwrap ( ) ;
709+ assert ! ( old_locs. is_empty( ) ) ; // no active ops were moved
687710 assert_eq ! ( db. inactivity_floor_loc, 3 ) ;
688711 assert_eq ! ( db. log. size( ) . await . unwrap( ) , 6 ) ; // 4 updates, 1 deletion, 1 commit
689712
@@ -718,11 +741,12 @@ mod test {
718741 assert_eq ! ( db. root( & mut hasher) , root) ;
719742
720743 // Make sure closing/reopening gets us back to the same state.
721- db. commit ( & mut hasher, None ) . await . unwrap ( ) ;
744+ db. commit ( & mut hasher, Some ( & mut old_locs) ) . await . unwrap ( ) ;
745+ assert ! ( old_locs. is_empty( ) ) ;
722746 assert_eq ! ( db. log. size( ) . await . unwrap( ) , 9 ) ;
723747 let root = db. root ( & mut hasher) ;
724748 db. close ( ) . await . unwrap ( ) ;
725- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
749+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
726750 assert_eq ! ( db. log. size( ) . await . unwrap( ) , 9 ) ;
727751 assert_eq ! ( db. root( & mut hasher) , root) ;
728752
@@ -745,7 +769,7 @@ mod test {
745769 db. commit ( & mut hasher, None ) . await . unwrap ( ) ;
746770 let root = db. root ( & mut hasher) ;
747771 db. close ( ) . await . unwrap ( ) ;
748- let mut db = open_db :: < _ , 32 > ( context, & mut hasher) . await ;
772+ let mut db = open_db :: < _ , 32 > ( context, & mut hasher, None ) . await ;
749773 assert_eq ! ( db. root( & mut hasher) , root) ;
750774
751775 // Commit will raise the inactivity floor, which won't affect state but will affect the
@@ -768,12 +792,12 @@ mod test {
768792 let executor = deterministic:: Runner :: default ( ) ;
769793 // Build a db with 1000 keys, some of which we update and some of which we delete, and
770794 // confirm that the end state of the db matches that of an identically updated hashmap.
795+ const ELEMENTS : u64 = 1000 ;
771796 executor. start ( |context| async move {
772797 let mut hasher = Sha256 :: new ( ) ;
773- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
798+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
774799
775800 let mut map = HashMap :: < Digest , Digest > :: default ( ) ;
776- const ELEMENTS : u64 = 1000 ;
777801 for i in 0u64 ..ELEMENTS {
778802 let k = hash ( & i. to_be_bytes ( ) ) ;
779803 let v = hash ( & ( i * 1000 ) . to_be_bytes ( ) ) ;
@@ -809,15 +833,17 @@ mod test {
809833 assert_eq ! ( db. snapshot. len( ) , 857 ) ;
810834
811835 // Test that commit will raise the activity floor.
812- db. commit ( & mut hasher, None ) . await . unwrap ( ) ;
836+ let mut old_locs = Vec :: new ( ) ;
837+ db. commit ( & mut hasher, Some ( & mut old_locs) ) . await . unwrap ( ) ;
813838 assert_eq ! ( db. op_count( ) , 2336 ) ;
814839 assert_eq ! ( db. oldest_retained_loc( ) . unwrap( ) , 1478 ) ;
815840 assert_eq ! ( db. snapshot. len( ) , 857 ) ;
841+ assert_eq ! ( old_locs. len( ) , 858 ) ; // 857 active ops moved once, one moved twice.
816842
817843 // Close & reopen the db, making sure the re-opened db has exactly the same state.
818844 let root_hash = db. root ( & mut hasher) ;
819845 db. close ( ) . await . unwrap ( ) ;
820- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
846+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
821847 assert_eq ! ( root_hash, db. root( & mut hasher) ) ;
822848 assert_eq ! ( db. op_count( ) , 2336 ) ;
823849 assert_eq ! ( db. inactivity_floor_loc, 1478 ) ;
@@ -877,7 +903,7 @@ mod test {
877903 let executor = deterministic:: Runner :: default ( ) ;
878904 executor. start ( |context| async move {
879905 let mut hasher = Sha256 :: new ( ) ;
880- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
906+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
881907
882908 // Insert 1000 keys then sync.
883909 const ELEMENTS : u64 = 1000 ;
@@ -904,7 +930,7 @@ mod test {
904930
905931 // Journaled MMR recovery should read the orphaned leaf & its parents, then log
906932 // replaying will restore the rest.
907- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
933+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
908934 assert_eq ! ( db. root( & mut hasher) , root) ;
909935
910936 // Write some additional nodes, simulate failed log commit, and test we recover to the previous commit point.
@@ -914,7 +940,7 @@ mod test {
914940 db. update ( & mut hasher, k, v) . await . unwrap ( ) ;
915941 }
916942 db. simulate_failed_commit_log ( & mut hasher) . await . unwrap ( ) ;
917- let db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
943+ let db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
918944 assert_eq ! ( db. root( & mut hasher) , root) ;
919945 } ) ;
920946 }
@@ -926,7 +952,7 @@ mod test {
926952 let executor = deterministic:: Runner :: default ( ) ;
927953 executor. start ( |context| async move {
928954 let mut hasher = Sha256 :: new ( ) ;
929- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
955+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
930956
931957 // Update the same key many times.
932958 const UPDATES : u64 = 100 ;
@@ -935,12 +961,15 @@ mod test {
935961 let v = hash ( & ( i * 1000 ) . to_be_bytes ( ) ) ;
936962 db. update ( & mut hasher, k, v) . await . unwrap ( ) ;
937963 }
938- db. commit ( & mut hasher, None ) . await . unwrap ( ) ;
964+ let mut old_locs = Vec :: new ( ) ;
965+ db. commit ( & mut hasher, Some ( & mut old_locs) ) . await . unwrap ( ) ;
966+ // only 1 active op, but it gets moved twice due to the commit op.
967+ assert_eq ! ( old_locs. len( ) , 2 ) ;
939968 let root = db. root ( & mut hasher) ;
940969 db. close ( ) . await . unwrap ( ) ;
941970
942971 // Simulate a failed commit and test that the log replay doesn't leave behind old data.
943- let db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
972+ let db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
944973 let iter = db. snapshot . get_iter ( & k) ;
945974 assert_eq ! ( iter. cloned( ) . collect:: <Vec <_>>( ) . len( ) , 1 ) ;
946975 assert_eq ! ( db. root( & mut hasher) , root) ;
@@ -952,7 +981,7 @@ mod test {
952981 let executor = deterministic:: Runner :: default ( ) ;
953982 executor. start ( |context| async move {
954983 let mut hasher = Sha256 :: new ( ) ;
955- let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
984+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
956985
957986 let mut map = HashMap :: < Digest , Digest > :: default ( ) ;
958987 const ELEMENTS : u64 = 10 ;
@@ -977,9 +1006,89 @@ mod test {
9771006 // Close & reopen the db, making sure the re-opened db has exactly the same state.
9781007 let root_hash = db. root ( & mut hasher) ;
9791008 db. close ( ) . await . unwrap ( ) ;
980- let db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher) . await ;
1009+ let db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
9811010 assert_eq ! ( root_hash, db. root( & mut hasher) ) ;
9821011 assert ! ( db. get( & k) . await . unwrap( ) . is_none( ) ) ;
9831012 } ) ;
9841013 }
1014+
1015+ /// This test builds a random database which we then commit, close, and reopen with a bitmap. We
1016+ /// then check that the bitmap state matches that of the database's active operations.
1017+ #[ test_traced( "WARN" ) ]
1018+ pub fn test_any_restore_with_bitmap ( ) {
1019+ // Number of elements to initially insert into the db.
1020+ const ELEMENTS : u64 = 1000 ;
1021+
1022+ // Use a non-deterministic rng seed to ensure each run is different.
1023+ let rng_seed = OsRng . next_u64 ( ) ;
1024+ // Log the seed with high visibility to make failures reproducible.
1025+ warn ! ( "rng_seed={}" , rng_seed) ;
1026+ let mut rng = StdRng :: seed_from_u64 ( rng_seed) ;
1027+
1028+ let executor = deterministic:: Runner :: default ( ) ;
1029+ executor. start ( |context| async move {
1030+ let mut hasher = Sha256 :: new ( ) ;
1031+ let mut db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, None ) . await ;
1032+
1033+ for i in 0u64 ..ELEMENTS {
1034+ let k = hash ( & i. to_be_bytes ( ) ) ;
1035+ let v = hash ( & rng. next_u32 ( ) . to_be_bytes ( ) ) ;
1036+ db. update ( & mut hasher, k, v) . await . unwrap ( ) ;
1037+ }
1038+
1039+ // Randomly update / delete them. We use a delete frequency that is 1/7th of the update
1040+ // frequency.
1041+ for _ in 0u64 ..ELEMENTS * 10 {
1042+ let rand_key = hash ( & ( rng. next_u64 ( ) % ELEMENTS ) . to_be_bytes ( ) ) ;
1043+ if rng. next_u32 ( ) % 7 == 0 {
1044+ db. delete ( & mut hasher, rand_key) . await . unwrap ( ) ;
1045+ continue ;
1046+ }
1047+ let v = hash ( & rng. next_u32 ( ) . to_be_bytes ( ) ) ;
1048+ db. update ( & mut hasher, rand_key, v) . await . unwrap ( ) ;
1049+ if rng. next_u32 ( ) % 20 == 0 {
1050+ // Commit every ~20 updates.
1051+ db. commit ( & mut hasher, None ) . await . unwrap ( ) ;
1052+ }
1053+ }
1054+ db. commit ( & mut hasher, None ) . await . unwrap ( ) ;
1055+
1056+ // Create a bitmap based on the current db's pruned/inactive state.
1057+ let mut bitmap = Bitmap :: < _ , 32 > :: new ( ) ;
1058+ for _ in 0 ..db. inactivity_floor_loc {
1059+ bitmap. append ( & mut hasher, false ) ;
1060+ }
1061+ assert_eq ! ( bitmap. bit_count( ) , db. inactivity_floor_loc) ;
1062+
1063+ // Close the db, then re-open it with a bitmap.
1064+ let root = db. root ( & mut hasher) ;
1065+ db. close ( ) . await . unwrap ( ) ;
1066+ let db = open_db :: < _ , 32 > ( context. clone ( ) , & mut hasher, Some ( & mut bitmap) ) . await ;
1067+ assert_eq ! ( db. root( & mut hasher) , root) ;
1068+
1069+ // Check the bitmap state matches that of the snapshot.
1070+ let items = db. log . size ( ) . await . unwrap ( ) ;
1071+ assert_eq ! ( bitmap. bit_count( ) , items) ;
1072+ let mut active_positions = HashSet :: new ( ) ;
1073+ for pos in db. inactivity_floor_loc ..items {
1074+ let item = db. log . read ( pos) . await . unwrap ( ) ;
1075+ let iter = db. snapshot . get_iter ( & item. to_key ( ) ) ;
1076+ let mut active = false ;
1077+ for loc in iter {
1078+ if * loc == pos {
1079+ // Found an active op.
1080+ active_positions. insert ( pos) ;
1081+ active = true ;
1082+ break ;
1083+ }
1084+ }
1085+ assert_eq ! ( bitmap. get_bit( pos) , active, "position {} should match" , pos) ;
1086+ }
1087+ for pos in db. inactivity_floor_loc ..items {
1088+ if !active_positions. contains ( & pos) {
1089+ assert ! ( !bitmap. get_bit( pos) ) ;
1090+ }
1091+ }
1092+ } ) ;
1093+ }
9851094}
0 commit comments