@@ -11,17 +11,38 @@ use crate::{
1111 Error as JournalError ,
1212 } ,
1313 mmr:: { iterator:: nodes_to_pin, Location , Proof } ,
14- qmdb:: { build_snapshot_from_log, operation:: Operation as OperationTrait , Error } ,
14+ qmdb:: {
15+ build_snapshot_from_log, delete_known_loc, operation:: Operation as OperationTrait ,
16+ update_known_loc, Error ,
17+ } ,
1518 Persistable ,
1619} ;
1720use commonware_codec:: { Codec , CodecShared } ;
1821use commonware_cryptography:: Hasher ;
1922use commonware_runtime:: { Clock , Metrics , Storage } ;
2023use core:: num:: NonZeroU64 ;
24+ use std:: collections:: HashMap ;
2125
2226/// Type alias for the authenticated journal used by [Db].
2327pub ( crate ) type AuthenticatedLog < E , C , H > = authenticated:: Journal < E , C , H > ;
2428
29+ /// Snapshot mutation needed to undo one operation while rewinding.
30+ enum SnapshotUndo < K > {
31+ Replace {
32+ key : K ,
33+ old_loc : Location ,
34+ new_loc : Location ,
35+ } ,
36+ Remove {
37+ key : K ,
38+ old_loc : Location ,
39+ } ,
40+ Insert {
41+ key : K ,
42+ new_loc : Location ,
43+ } ,
44+ }
45+
2546/// An "Any" QMDB implementation generic over ordered/unordered keys and variable/fixed values.
2647/// Consider using one of the following specialized variants instead, which may be more ergonomic:
2748/// - [crate::qmdb::any::ordered::fixed::Db]
@@ -191,6 +212,152 @@ where
191212 self . historical_proof ( self . log . size ( ) . await , loc, max_ops)
192213 . await
193214 }
215+
216+ /// Rewind the database to `size` operations, where `size` is the location of the next append.
217+ ///
218+ /// This rewinds both the authenticated log and the in-memory snapshot, then restores metadata
219+ /// (`last_commit_loc`, `inactivity_floor_loc`, `active_keys`) for the new tip commit.
220+ ///
221+ /// # Errors
222+ ///
223+ /// Returns an error when:
224+ /// - `size` is not a valid rewind target
225+ /// - the target's required logical range is not fully retained (for example, the target
226+ /// inactivity floor is pruned)
227+ /// - `size - 1` is not a commit operation
228+ ///
229+ /// Any error from this method is fatal for this handle. Rewind may mutate journal state before
230+ /// all in-memory structures are rebuilt. Callers must drop this database handle after any `Err`
231+ /// from `rewind` and reopen from storage.
232+ ///
233+ /// Returns the list of locations restored to active state in the snapshot.
234+ ///
235+ /// A successful rewind is not restart-stable until a subsequent [`Db::commit`] or
236+ /// [`Db::sync`].
237+ pub async fn rewind ( & mut self , size : Location ) -> Result < Vec < Location > , Error > {
238+ let rewind_size = * size;
239+ let current_size = * self . last_commit_loc + 1 ;
240+
241+ if rewind_size == current_size {
242+ return Ok ( Vec :: new ( ) ) ;
243+ }
244+ if rewind_size == 0 || rewind_size > current_size {
245+ return Err ( Error :: Journal ( JournalError :: InvalidRewind ( rewind_size) ) ) ;
246+ }
247+
248+ // Read everything needed for rewind before mutating storage.
249+ let ( rewind_floor, undos, active_keys_delta) = {
250+ let reader = self . log . reader ( ) . await ;
251+ let bounds = reader. bounds ( ) ;
252+ let rewind_last_loc = Location :: new ( rewind_size - 1 ) ;
253+ if rewind_size <= bounds. start {
254+ return Err ( Error :: Journal ( JournalError :: ItemPruned ( * rewind_last_loc) ) ) ;
255+ }
256+ let rewind_last_op = reader. read ( * rewind_last_loc) . await ?;
257+ let Some ( rewind_floor) = rewind_last_op. has_floor ( ) else {
258+ return Err ( Error :: UnexpectedData ( rewind_last_loc) ) ;
259+ } ;
260+ if * rewind_floor < bounds. start {
261+ return Err ( Error :: Journal ( JournalError :: ItemPruned ( * rewind_floor) ) ) ;
262+ }
263+
264+ let mut undos = Vec :: with_capacity ( ( current_size - rewind_size) as usize ) ;
265+ let mut active_keys_delta = 0isize ;
266+ let mut prior_state_by_key: HashMap < U :: Key , Option < Location > > = HashMap :: new ( ) ;
267+
268+ // Reconstruct key state once in a single pass from the rewind floor.
269+ for loc in * rewind_floor..current_size {
270+ let op = reader. read ( loc) . await ?;
271+ let op_loc = Location :: new ( loc) ;
272+ match op {
273+ Operation :: CommitFloor ( _, _) => { }
274+ Operation :: Update ( update) => {
275+ let key = update. key ( ) . clone ( ) ;
276+ let previous_loc = prior_state_by_key. get ( & key) . copied ( ) . flatten ( ) ;
277+
278+ if loc >= rewind_size {
279+ if let Some ( previous_loc) = previous_loc {
280+ undos. push ( SnapshotUndo :: Replace {
281+ key : key. clone ( ) ,
282+ old_loc : op_loc,
283+ new_loc : previous_loc,
284+ } ) ;
285+ } else {
286+ active_keys_delta -= 1 ;
287+ undos. push ( SnapshotUndo :: Remove {
288+ key : key. clone ( ) ,
289+ old_loc : op_loc,
290+ } ) ;
291+ }
292+ }
293+
294+ prior_state_by_key. insert ( key, Some ( op_loc) ) ;
295+ }
296+ Operation :: Delete ( key) => {
297+ let previous_loc = prior_state_by_key. get ( & key) . copied ( ) . flatten ( ) ;
298+
299+ if loc >= rewind_size {
300+ if let Some ( previous_loc) = previous_loc {
301+ active_keys_delta += 1 ;
302+ undos. push ( SnapshotUndo :: Insert {
303+ key : key. clone ( ) ,
304+ new_loc : previous_loc,
305+ } ) ;
306+ }
307+ }
308+
309+ prior_state_by_key. insert ( key, None ) ;
310+ }
311+ }
312+ }
313+
314+ // Undo operations must run from newest to oldest removed operation.
315+ undos. reverse ( ) ;
316+
317+ ( rewind_floor, undos, active_keys_delta)
318+ } ;
319+
320+ // Journal rewind happens before in-memory undo application. If any later step fails, this
321+ // handle may be internally diverged and must be dropped by the caller. This step is not
322+ // restart-stable until a later commit/sync boundary.
323+ self . log . rewind ( rewind_size) . await ?;
324+
325+ let mut restored_locs = Vec :: new ( ) ;
326+ for undo in undos {
327+ match undo {
328+ SnapshotUndo :: Replace {
329+ key,
330+ old_loc,
331+ new_loc,
332+ } => {
333+ if new_loc < rewind_size {
334+ restored_locs. push ( new_loc) ;
335+ }
336+ update_known_loc ( & mut self . snapshot , & key, old_loc, new_loc) ;
337+ }
338+ SnapshotUndo :: Remove { key, old_loc } => {
339+ delete_known_loc ( & mut self . snapshot , & key, old_loc)
340+ }
341+ SnapshotUndo :: Insert { key, new_loc } => {
342+ if new_loc < rewind_size {
343+ restored_locs. push ( new_loc) ;
344+ }
345+ self . snapshot . insert ( & key, new_loc) ;
346+ }
347+ }
348+ }
349+
350+ self . active_keys = self
351+ . active_keys
352+ . checked_add_signed ( active_keys_delta)
353+ . ok_or ( Error :: DataCorrupted (
354+ "active_keys underflow while rewinding" ,
355+ ) ) ?;
356+ self . last_commit_loc = Location :: new ( rewind_size - 1 ) ;
357+ self . inactivity_floor_loc = rewind_floor;
358+
359+ Ok ( restored_locs)
360+ }
194361}
195362
196363// Functionality requiring Mutable + Persistable journal.
0 commit comments